]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_api/search/db_searches.py
release 4.5.0.post7
[nominatim.git] / src / nominatim_api / search / db_searches.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of the actual database accesses for forward search.
9 """
10 from typing import List, Tuple, AsyncIterator, Dict, Any, Callable, cast
11 import abc
12
13 import sqlalchemy as sa
14
15 from ..typing import SaFromClause, SaScalarSelect, SaColumn, \
16                      SaExpression, SaSelect, SaLambdaSelect, SaRow, SaBind
17 from ..sql.sqlalchemy_types import Geometry, IntArray
18 from ..connection import SearchConnection
19 from ..types import SearchDetails, DataLayer, GeometryFormat, Bbox
20 from .. import results as nres
21 from .db_search_fields import SearchData, WeightedCategories
22
23
24 def no_index(expr: SaColumn) -> SaColumn:
25     """ Wrap the given expression, so that the query planner will
26         refrain from using the expression for index lookup.
27     """
28     return sa.func.coalesce(sa.null(), expr)
29
30
31 def _details_to_bind_params(details: SearchDetails) -> Dict[str, Any]:
32     """ Create a dictionary from search parameters that can be used
33         as bind parameter for SQL execute.
34     """
35     return {'limit': details.max_results,
36             'min_rank': details.min_rank,
37             'max_rank': details.max_rank,
38             'viewbox': details.viewbox,
39             'viewbox2': details.viewbox_x2,
40             'near': details.near,
41             'near_radius': details.near_radius,
42             'excluded': details.excluded,
43             'countries': details.countries}
44
45
46 LIMIT_PARAM: SaBind = sa.bindparam('limit')
47 MIN_RANK_PARAM: SaBind = sa.bindparam('min_rank')
48 MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
49 VIEWBOX_PARAM: SaBind = sa.bindparam('viewbox', type_=Geometry)
50 VIEWBOX2_PARAM: SaBind = sa.bindparam('viewbox2', type_=Geometry)
51 NEAR_PARAM: SaBind = sa.bindparam('near', type_=Geometry)
52 NEAR_RADIUS_PARAM: SaBind = sa.bindparam('near_radius')
53 COUNTRIES_PARAM: SaBind = sa.bindparam('countries')
54
55
56 def filter_by_area(sql: SaSelect, t: SaFromClause,
57                    details: SearchDetails, avoid_index: bool = False) -> SaSelect:
58     """ Apply SQL statements for filtering by viewbox and near point,
59         if applicable.
60     """
61     if details.near is not None and details.near_radius is not None:
62         if details.near_radius < 0.1 and not avoid_index:
63             sql = sql.where(t.c.geometry.within_distance(NEAR_PARAM, NEAR_RADIUS_PARAM))
64         else:
65             sql = sql.where(t.c.geometry.ST_Distance(NEAR_PARAM) <= NEAR_RADIUS_PARAM)
66     if details.viewbox is not None and details.bounded_viewbox:
67         sql = sql.where(t.c.geometry.intersects(VIEWBOX_PARAM,
68                                                 use_index=not avoid_index and
69                                                 details.viewbox.area < 0.2))
70
71     return sql
72
73
74 def _exclude_places(t: SaFromClause) -> Callable[[], SaExpression]:
75     return lambda: t.c.place_id.not_in(sa.bindparam('excluded'))
76
77
78 def _select_placex(t: SaFromClause) -> SaSelect:
79     return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
80                      t.c.class_, t.c.type,
81                      t.c.address, t.c.extratags,
82                      t.c.housenumber, t.c.postcode, t.c.country_code,
83                      t.c.wikipedia,
84                      t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
85                      t.c.linked_place_id, t.c.admin_level,
86                      t.c.centroid,
87                      t.c.geometry.ST_Expand(0).label('bbox'))
88
89
90 def _add_geometry_columns(sql: SaLambdaSelect, col: SaColumn, details: SearchDetails) -> SaSelect:
91     out = []
92
93     if details.geometry_simplification > 0.0:
94         col = sa.func.ST_SimplifyPreserveTopology(col, details.geometry_simplification)
95
96     if details.geometry_output & GeometryFormat.GEOJSON:
97         out.append(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson'))
98     if details.geometry_output & GeometryFormat.TEXT:
99         out.append(sa.func.ST_AsText(col).label('geometry_text'))
100     if details.geometry_output & GeometryFormat.KML:
101         out.append(sa.func.ST_AsKML(col, 7).label('geometry_kml'))
102     if details.geometry_output & GeometryFormat.SVG:
103         out.append(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg'))
104
105     return sql.add_columns(*out)
106
107
108 def _make_interpolation_subquery(table: SaFromClause, inner: SaFromClause,
109                                  numerals: List[int], details: SearchDetails) -> SaScalarSelect:
110     all_ids = sa.func.ArrayAgg(table.c.place_id)
111     sql = sa.select(all_ids).where(table.c.parent_place_id == inner.c.place_id)
112
113     if len(numerals) == 1:
114         sql = sql.where(sa.between(numerals[0], table.c.startnumber, table.c.endnumber))\
115                  .where((numerals[0] - table.c.startnumber) % table.c.step == 0)
116     else:
117         sql = sql.where(sa.or_(
118                 *(sa.and_(sa.between(n, table.c.startnumber, table.c.endnumber),
119                           (n - table.c.startnumber) % table.c.step == 0)
120                   for n in numerals)))
121
122     if details.excluded:
123         sql = sql.where(_exclude_places(table))
124
125     return sql.scalar_subquery()
126
127
128 def _filter_by_layer(table: SaFromClause, layers: DataLayer) -> SaColumn:
129     orexpr: List[SaExpression] = []
130     if layers & DataLayer.ADDRESS and layers & DataLayer.POI:
131         orexpr.append(no_index(table.c.rank_address).between(1, 30))
132     elif layers & DataLayer.ADDRESS:
133         orexpr.append(no_index(table.c.rank_address).between(1, 29))
134         orexpr.append(sa.func.IsAddressPoint(table))
135     elif layers & DataLayer.POI:
136         orexpr.append(sa.and_(no_index(table.c.rank_address) == 30,
137                               table.c.class_.not_in(('place', 'building'))))
138
139     if layers & DataLayer.MANMADE:
140         exclude = []
141         if not layers & DataLayer.RAILWAY:
142             exclude.append('railway')
143         if not layers & DataLayer.NATURAL:
144             exclude.extend(('natural', 'water', 'waterway'))
145         orexpr.append(sa.and_(table.c.class_.not_in(tuple(exclude)),
146                               no_index(table.c.rank_address) == 0))
147     else:
148         include = []
149         if layers & DataLayer.RAILWAY:
150             include.append('railway')
151         if layers & DataLayer.NATURAL:
152             include.extend(('natural', 'water', 'waterway'))
153         orexpr.append(sa.and_(table.c.class_.in_(tuple(include)),
154                               no_index(table.c.rank_address) == 0))
155
156     if len(orexpr) == 1:
157         return orexpr[0]
158
159     return sa.or_(*orexpr)
160
161
162 def _interpolated_position(table: SaFromClause, nr: SaColumn) -> SaColumn:
163     pos = sa.cast(nr - table.c.startnumber, sa.Float) / (table.c.endnumber - table.c.startnumber)
164     return sa.case(
165             (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
166             else_=table.c.linegeo.ST_LineInterpolatePoint(pos)).label('centroid')
167
168
169 async def _get_placex_housenumbers(conn: SearchConnection,
170                                    place_ids: List[int],
171                                    details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
172     t = conn.t.placex
173     sql = _select_placex(t).add_columns(t.c.importance)\
174                            .where(t.c.place_id.in_(place_ids))
175
176     if details.geometry_output:
177         sql = _add_geometry_columns(sql, t.c.geometry, details)
178
179     for row in await conn.execute(sql):
180         result = nres.create_from_placex_row(row, nres.SearchResult)
181         assert result
182         result.bbox = Bbox.from_wkb(row.bbox)
183         yield result
184
185
186 def _int_list_to_subquery(inp: List[int]) -> 'sa.Subquery':
187     """ Create a subselect that returns the given list of integers
188         as rows in the column 'nr'.
189     """
190     vtab = sa.func.JsonArrayEach(sa.type_coerce(inp, sa.JSON))\
191              .table_valued(sa.column('value', type_=sa.JSON))
192     return sa.select(sa.cast(sa.cast(vtab.c.value, sa.Text), sa.Integer).label('nr')).subquery()
193
194
195 async def _get_osmline(conn: SearchConnection, place_ids: List[int],
196                        numerals: List[int],
197                        details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
198     t = conn.t.osmline
199
200     values = _int_list_to_subquery(numerals)
201     sql = sa.select(t.c.place_id, t.c.osm_id,
202                     t.c.parent_place_id, t.c.address,
203                     values.c.nr.label('housenumber'),
204                     _interpolated_position(t, values.c.nr),
205                     t.c.postcode, t.c.country_code)\
206             .where(t.c.place_id.in_(place_ids))\
207             .join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber))
208
209     if details.geometry_output:
210         sub = sql.subquery()
211         sql = _add_geometry_columns(sa.select(sub), sub.c.centroid, details)
212
213     for row in await conn.execute(sql):
214         result = nres.create_from_osmline_row(row, nres.SearchResult)
215         assert result
216         yield result
217
218
219 async def _get_tiger(conn: SearchConnection, place_ids: List[int],
220                      numerals: List[int], osm_id: int,
221                      details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
222     t = conn.t.tiger
223     values = _int_list_to_subquery(numerals)
224     sql = sa.select(t.c.place_id, t.c.parent_place_id,
225                     sa.literal('W').label('osm_type'),
226                     sa.literal(osm_id).label('osm_id'),
227                     values.c.nr.label('housenumber'),
228                     _interpolated_position(t, values.c.nr),
229                     t.c.postcode)\
230             .where(t.c.place_id.in_(place_ids))\
231             .join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber))
232
233     if details.geometry_output:
234         sub = sql.subquery()
235         sql = _add_geometry_columns(sa.select(sub), sub.c.centroid, details)
236
237     for row in await conn.execute(sql):
238         result = nres.create_from_tiger_row(row, nres.SearchResult)
239         assert result
240         yield result
241
242
243 class AbstractSearch(abc.ABC):
244     """ Encapuslation of a single lookup in the database.
245     """
246     SEARCH_PRIO: int = 2
247
248     def __init__(self, penalty: float) -> None:
249         self.penalty = penalty
250
251     @abc.abstractmethod
252     async def lookup(self, conn: SearchConnection,
253                      details: SearchDetails) -> nres.SearchResults:
254         """ Find results for the search in the database.
255         """
256
257
258 class NearSearch(AbstractSearch):
259     """ Category search of a place type near the result of another search.
260     """
261     def __init__(self, penalty: float, categories: WeightedCategories,
262                  search: AbstractSearch) -> None:
263         super().__init__(penalty)
264         self.search = search
265         self.categories = categories
266
267     async def lookup(self, conn: SearchConnection,
268                      details: SearchDetails) -> nres.SearchResults:
269         """ Find results for the search in the database.
270         """
271         results = nres.SearchResults()
272         base = await self.search.lookup(conn, details)
273
274         if not base:
275             return results
276
277         base.sort(key=lambda r: (r.accuracy, r.rank_search))
278         max_accuracy = base[0].accuracy + 0.5
279         if base[0].rank_address == 0:
280             min_rank = 0
281             max_rank = 0
282         elif base[0].rank_address < 26:
283             min_rank = 1
284             max_rank = min(25, base[0].rank_address + 4)
285         else:
286             min_rank = 26
287             max_rank = 30
288         base = nres.SearchResults(r for r in base
289                                   if (r.source_table == nres.SourceTable.PLACEX
290                                       and r.accuracy <= max_accuracy
291                                       and r.bbox and r.bbox.area < 20
292                                       and r.rank_address >= min_rank
293                                       and r.rank_address <= max_rank))
294
295         if base:
296             baseids = [b.place_id for b in base[:5] if b.place_id]
297
298             for category, penalty in self.categories:
299                 await self.lookup_category(results, conn, baseids, category, penalty, details)
300                 if len(results) >= details.max_results:
301                     break
302
303         return results
304
305     async def lookup_category(self, results: nres.SearchResults,
306                               conn: SearchConnection, ids: List[int],
307                               category: Tuple[str, str], penalty: float,
308                               details: SearchDetails) -> None:
309         """ Find places of the given category near the list of
310             place ids and add the results to 'results'.
311         """
312         table = await conn.get_class_table(*category)
313
314         tgeom = conn.t.placex.alias('pgeom')
315
316         if table is None:
317             # No classtype table available, do a simplified lookup in placex.
318             table = conn.t.placex
319             sql = sa.select(table.c.place_id,
320                             sa.func.min(tgeom.c.centroid.ST_Distance(table.c.centroid))
321                               .label('dist'))\
322                     .join(tgeom, table.c.geometry.intersects(tgeom.c.centroid.ST_Expand(0.01)))\
323                     .where(table.c.class_ == category[0])\
324                     .where(table.c.type == category[1])
325         else:
326             # Use classtype table. We can afford to use a larger
327             # radius for the lookup.
328             sql = sa.select(table.c.place_id,
329                             sa.func.min(tgeom.c.centroid.ST_Distance(table.c.centroid))
330                               .label('dist'))\
331                     .join(tgeom,
332                           table.c.centroid.ST_CoveredBy(
333                               sa.case((sa.and_(tgeom.c.rank_address > 9,
334                                                tgeom.c.geometry.is_area()),
335                                        tgeom.c.geometry),
336                                       else_=tgeom.c.centroid.ST_Expand(0.05))))
337
338         inner = sql.where(tgeom.c.place_id.in_(ids))\
339                    .group_by(table.c.place_id).subquery()
340
341         t = conn.t.placex
342         sql = _select_placex(t).add_columns((-inner.c.dist).label('importance'))\
343                                .join(inner, inner.c.place_id == t.c.place_id)\
344                                .order_by(inner.c.dist)
345
346         sql = sql.where(no_index(t.c.rank_address).between(MIN_RANK_PARAM, MAX_RANK_PARAM))
347         if details.countries:
348             sql = sql.where(t.c.country_code.in_(COUNTRIES_PARAM))
349         if details.excluded:
350             sql = sql.where(_exclude_places(t))
351         if details.layers is not None:
352             sql = sql.where(_filter_by_layer(t, details.layers))
353
354         sql = sql.limit(LIMIT_PARAM)
355         for row in await conn.execute(sql, _details_to_bind_params(details)):
356             result = nres.create_from_placex_row(row, nres.SearchResult)
357             assert result
358             result.accuracy = self.penalty + penalty
359             result.bbox = Bbox.from_wkb(row.bbox)
360             results.append(result)
361
362
363 class PoiSearch(AbstractSearch):
364     """ Category search in a geographic area.
365     """
366     def __init__(self, sdata: SearchData) -> None:
367         super().__init__(sdata.penalty)
368         self.qualifiers = sdata.qualifiers
369         self.countries = sdata.countries
370
371     async def lookup(self, conn: SearchConnection,
372                      details: SearchDetails) -> nres.SearchResults:
373         """ Find results for the search in the database.
374         """
375         bind_params = _details_to_bind_params(details)
376         t = conn.t.placex
377
378         rows: List[SaRow] = []
379
380         if details.near and details.near_radius is not None and details.near_radius < 0.2:
381             # simply search in placex table
382             def _base_query() -> SaSelect:
383                 return _select_placex(t) \
384                            .add_columns((-t.c.centroid.ST_Distance(NEAR_PARAM))
385                                         .label('importance'))\
386                            .where(t.c.linked_place_id == None) \
387                            .where(t.c.geometry.within_distance(NEAR_PARAM, NEAR_RADIUS_PARAM)) \
388                            .order_by(t.c.centroid.ST_Distance(NEAR_PARAM)) \
389                            .limit(LIMIT_PARAM)
390
391             classtype = self.qualifiers.values
392             if len(classtype) == 1:
393                 cclass, ctype = classtype[0]
394                 sql: SaLambdaSelect = sa.lambda_stmt(
395                     lambda: _base_query().where(t.c.class_ == cclass)
396                                          .where(t.c.type == ctype))
397             else:
398                 sql = _base_query().where(sa.or_(*(sa.and_(t.c.class_ == cls, t.c.type == typ)
399                                                    for cls, typ in classtype)))
400
401             if self.countries:
402                 sql = sql.where(t.c.country_code.in_(self.countries.values))
403
404             if details.viewbox is not None and details.bounded_viewbox:
405                 sql = sql.where(t.c.geometry.intersects(VIEWBOX_PARAM))
406
407             rows.extend(await conn.execute(sql, bind_params))
408         else:
409             # use the class type tables
410             for category in self.qualifiers.values:
411                 table = await conn.get_class_table(*category)
412                 if table is not None:
413                     sql = _select_placex(t)\
414                                .add_columns(t.c.importance)\
415                                .join(table, t.c.place_id == table.c.place_id)\
416                                .where(t.c.class_ == category[0])\
417                                .where(t.c.type == category[1])
418
419                     if details.viewbox is not None and details.bounded_viewbox:
420                         sql = sql.where(table.c.centroid.intersects(VIEWBOX_PARAM))
421
422                     if details.near and details.near_radius is not None:
423                         sql = sql.order_by(table.c.centroid.ST_Distance(NEAR_PARAM))\
424                                  .where(table.c.centroid.within_distance(NEAR_PARAM,
425                                                                          NEAR_RADIUS_PARAM))
426
427                     if self.countries:
428                         sql = sql.where(t.c.country_code.in_(self.countries.values))
429
430                     sql = sql.limit(LIMIT_PARAM)
431                     rows.extend(await conn.execute(sql, bind_params))
432
433         results = nres.SearchResults()
434         for row in rows:
435             result = nres.create_from_placex_row(row, nres.SearchResult)
436             assert result
437             result.accuracy = self.penalty + self.qualifiers.get_penalty((row.class_, row.type))
438             result.bbox = Bbox.from_wkb(row.bbox)
439             results.append(result)
440
441         return results
442
443
444 class CountrySearch(AbstractSearch):
445     """ Search for a country name or country code.
446     """
447     SEARCH_PRIO = 0
448
449     def __init__(self, sdata: SearchData) -> None:
450         super().__init__(sdata.penalty)
451         self.countries = sdata.countries
452
453     async def lookup(self, conn: SearchConnection,
454                      details: SearchDetails) -> nres.SearchResults:
455         """ Find results for the search in the database.
456         """
457         t = conn.t.placex
458
459         ccodes = self.countries.values
460         sql = _select_placex(t)\
461             .add_columns(t.c.importance)\
462             .where(t.c.country_code.in_(ccodes))\
463             .where(t.c.rank_address == 4)
464
465         if details.geometry_output:
466             sql = _add_geometry_columns(sql, t.c.geometry, details)
467
468         if details.excluded:
469             sql = sql.where(_exclude_places(t))
470
471         sql = filter_by_area(sql, t, details)
472
473         results = nres.SearchResults()
474         for row in await conn.execute(sql, _details_to_bind_params(details)):
475             result = nres.create_from_placex_row(row, nres.SearchResult)
476             assert result
477             result.accuracy = self.penalty + self.countries.get_penalty(row.country_code, 5.0)
478             result.bbox = Bbox.from_wkb(row.bbox)
479             results.append(result)
480
481         if not results:
482             results = await self.lookup_in_country_table(conn, details)
483
484         if results:
485             details.min_rank = min(5, details.max_rank)
486             details.max_rank = min(25, details.max_rank)
487
488         return results
489
490     async def lookup_in_country_table(self, conn: SearchConnection,
491                                       details: SearchDetails) -> nres.SearchResults:
492         """ Look up the country in the fallback country tables.
493         """
494         # Avoid the fallback search when this is a more search. Country results
495         # usually are in the first batch of results and it is not possible
496         # to exclude these fallbacks.
497         if details.excluded:
498             return nres.SearchResults()
499
500         t = conn.t.country_name
501         tgrid = conn.t.country_grid
502
503         sql = sa.select(tgrid.c.country_code,
504                         tgrid.c.geometry.ST_Centroid().ST_Collect().ST_Centroid()
505                              .label('centroid'),
506                         tgrid.c.geometry.ST_Collect().ST_Expand(0).label('bbox'))\
507                 .where(tgrid.c.country_code.in_(self.countries.values))\
508                 .group_by(tgrid.c.country_code)
509
510         sql = filter_by_area(sql, tgrid, details, avoid_index=True)
511
512         sub = sql.subquery('grid')
513
514         sql = sa.select(t.c.country_code,
515                         t.c.name.merge(t.c.derived_name).label('name'),
516                         sub.c.centroid, sub.c.bbox)\
517                 .join(sub, t.c.country_code == sub.c.country_code)
518
519         if details.geometry_output:
520             sql = _add_geometry_columns(sql, sub.c.centroid, details)
521
522         results = nres.SearchResults()
523         for row in await conn.execute(sql, _details_to_bind_params(details)):
524             result = nres.create_from_country_row(row, nres.SearchResult)
525             assert result
526             result.bbox = Bbox.from_wkb(row.bbox)
527             result.accuracy = self.penalty + self.countries.get_penalty(row.country_code, 5.0)
528             results.append(result)
529
530         return results
531
532
533 class PostcodeSearch(AbstractSearch):
534     """ Search for a postcode.
535     """
536     def __init__(self, extra_penalty: float, sdata: SearchData) -> None:
537         super().__init__(sdata.penalty + extra_penalty)
538         self.countries = sdata.countries
539         self.postcodes = sdata.postcodes
540         self.lookups = sdata.lookups
541         self.rankings = sdata.rankings
542
543     async def lookup(self, conn: SearchConnection,
544                      details: SearchDetails) -> nres.SearchResults:
545         """ Find results for the search in the database.
546         """
547         t = conn.t.postcode
548         pcs = self.postcodes.values
549
550         sql = sa.select(t.c.place_id, t.c.parent_place_id,
551                         t.c.rank_search, t.c.rank_address,
552                         t.c.postcode, t.c.country_code,
553                         t.c.geometry.label('centroid'))\
554                 .where(t.c.postcode.in_(pcs))
555
556         if details.geometry_output:
557             sql = _add_geometry_columns(sql, t.c.geometry, details)
558
559         penalty: SaExpression = sa.literal(self.penalty)
560
561         if details.viewbox is not None and not details.bounded_viewbox:
562             penalty += sa.case((t.c.geometry.intersects(VIEWBOX_PARAM), 0.0),
563                                (t.c.geometry.intersects(VIEWBOX2_PARAM), 0.5),
564                                else_=1.0)
565
566         if details.near is not None:
567             sql = sql.order_by(t.c.geometry.ST_Distance(NEAR_PARAM))
568
569         sql = filter_by_area(sql, t, details)
570
571         if self.countries:
572             sql = sql.where(t.c.country_code.in_(self.countries.values))
573
574         if details.excluded:
575             sql = sql.where(_exclude_places(t))
576
577         if self.lookups:
578             assert len(self.lookups) == 1
579             tsearch = conn.t.search_name
580             sql = sql.where(tsearch.c.place_id == t.c.parent_place_id)\
581                      .where((tsearch.c.name_vector + tsearch.c.nameaddress_vector)
582                             .contains(sa.type_coerce(self.lookups[0].tokens,
583                                                      IntArray)))
584
585         for ranking in self.rankings:
586             penalty += ranking.sql_penalty(conn.t.search_name)
587         penalty += sa.case(*((t.c.postcode == v, p) for v, p in self.postcodes),
588                            else_=1.0)
589
590         sql = sql.add_columns(penalty.label('accuracy'))
591         sql = sql.order_by('accuracy').limit(LIMIT_PARAM)
592
593         results = nres.SearchResults()
594         for row in await conn.execute(sql, _details_to_bind_params(details)):
595             p = conn.t.placex
596             placex_sql = _select_placex(p)\
597                 .add_columns(p.c.importance)\
598                 .where(sa.text("""class = 'boundary'
599                                   AND type = 'postal_code'
600                                   AND osm_type = 'R'"""))\
601                 .where(p.c.country_code == row.country_code)\
602                 .where(p.c.postcode == row.postcode)\
603                 .limit(1)
604
605             if details.geometry_output:
606                 placex_sql = _add_geometry_columns(placex_sql, p.c.geometry, details)
607
608             for prow in await conn.execute(placex_sql, _details_to_bind_params(details)):
609                 result = nres.create_from_placex_row(prow, nres.SearchResult)
610                 if result is not None:
611                     result.bbox = Bbox.from_wkb(prow.bbox)
612                 break
613             else:
614                 result = nres.create_from_postcode_row(row, nres.SearchResult)
615
616             assert result
617             if result.place_id not in details.excluded:
618                 result.accuracy = row.accuracy
619                 results.append(result)
620
621         return results
622
623
624 class PlaceSearch(AbstractSearch):
625     """ Generic search for an address or named place.
626     """
627     SEARCH_PRIO = 1
628
629     def __init__(self, extra_penalty: float, sdata: SearchData, expected_count: int) -> None:
630         super().__init__(sdata.penalty + extra_penalty)
631         self.countries = sdata.countries
632         self.postcodes = sdata.postcodes
633         self.housenumbers = sdata.housenumbers
634         self.qualifiers = sdata.qualifiers
635         self.lookups = sdata.lookups
636         self.rankings = sdata.rankings
637         self.expected_count = expected_count
638
639     def _inner_search_name_cte(self, conn: SearchConnection,
640                                details: SearchDetails) -> 'sa.CTE':
641         """ Create a subquery that preselects the rows in the search_name
642             table.
643         """
644         t = conn.t.search_name
645
646         penalty: SaExpression = sa.literal(self.penalty)
647         for ranking in self.rankings:
648             penalty += ranking.sql_penalty(t)
649
650         sql = sa.select(t.c.place_id, t.c.search_rank, t.c.address_rank,
651                         t.c.country_code, t.c.centroid,
652                         t.c.name_vector, t.c.nameaddress_vector,
653                         sa.case((t.c.importance > 0, t.c.importance),
654                                 else_=0.40001-(sa.cast(t.c.search_rank, sa.Float())/75))
655                           .label('importance'),
656                         penalty.label('penalty'))
657
658         for lookup in self.lookups:
659             sql = sql.where(lookup.sql_condition(t))
660
661         if self.countries:
662             sql = sql.where(t.c.country_code.in_(self.countries.values))
663
664         if self.postcodes:
665             # if a postcode is given, don't search for state or country level objects
666             sql = sql.where(t.c.address_rank > 9)
667             if self.expected_count > 10000:
668                 # Many results expected. Restrict by postcode.
669                 tpc = conn.t.postcode
670                 sql = sql.where(sa.select(tpc.c.postcode)
671                                   .where(tpc.c.postcode.in_(self.postcodes.values))
672                                   .where(t.c.centroid.within_distance(tpc.c.geometry, 0.4))
673                                   .exists())
674
675         if details.viewbox is not None:
676             if details.bounded_viewbox:
677                 sql = sql.where(t.c.centroid
678                                    .intersects(VIEWBOX_PARAM,
679                                                use_index=details.viewbox.area < 0.2))
680             elif not self.postcodes and not self.housenumbers and self.expected_count >= 10000:
681                 sql = sql.where(t.c.centroid
682                                    .intersects(VIEWBOX2_PARAM,
683                                                use_index=details.viewbox.area < 0.5))
684
685         if details.near is not None and details.near_radius is not None:
686             if details.near_radius < 0.1:
687                 sql = sql.where(t.c.centroid.within_distance(NEAR_PARAM,
688                                                              NEAR_RADIUS_PARAM))
689             else:
690                 sql = sql.where(t.c.centroid
691                                  .ST_Distance(NEAR_PARAM) < NEAR_RADIUS_PARAM)
692
693         if self.housenumbers:
694             sql = sql.where(t.c.address_rank.between(16, 30))
695         else:
696             if details.excluded:
697                 sql = sql.where(_exclude_places(t))
698             if details.min_rank > 0:
699                 sql = sql.where(sa.or_(t.c.address_rank >= MIN_RANK_PARAM,
700                                        t.c.search_rank >= MIN_RANK_PARAM))
701             if details.max_rank < 30:
702                 sql = sql.where(sa.or_(t.c.address_rank <= MAX_RANK_PARAM,
703                                        t.c.search_rank <= MAX_RANK_PARAM))
704
705         inner = sql.limit(10000).order_by(sa.desc(sa.text('importance'))).subquery()
706
707         sql = sa.select(inner.c.place_id, inner.c.search_rank, inner.c.address_rank,
708                         inner.c.country_code, inner.c.centroid, inner.c.importance,
709                         inner.c.penalty)
710
711         # If the query is not an address search or has a geographic preference,
712         # preselect most important items to restrict the number of places
713         # that need to be looked up in placex.
714         if not self.housenumbers\
715            and (details.viewbox is None or details.bounded_viewbox)\
716            and (details.near is None or details.near_radius is not None)\
717            and not self.qualifiers:
718             sql = sql.add_columns(sa.func.first_value(inner.c.penalty - inner.c.importance)
719                                     .over(order_by=inner.c.penalty - inner.c.importance)
720                                     .label('min_penalty'))
721
722             inner = sql.subquery()
723
724             sql = sa.select(inner.c.place_id, inner.c.search_rank, inner.c.address_rank,
725                             inner.c.country_code, inner.c.centroid, inner.c.importance,
726                             inner.c.penalty)\
727                     .where(inner.c.penalty - inner.c.importance < inner.c.min_penalty + 0.5)
728
729         return sql.cte('searches')
730
731     async def lookup(self, conn: SearchConnection,
732                      details: SearchDetails) -> nres.SearchResults:
733         """ Find results for the search in the database.
734         """
735         t = conn.t.placex
736         tsearch = self._inner_search_name_cte(conn, details)
737
738         sql = _select_placex(t).join(tsearch, t.c.place_id == tsearch.c.place_id)
739
740         if details.geometry_output:
741             sql = _add_geometry_columns(sql, t.c.geometry, details)
742
743         penalty: SaExpression = tsearch.c.penalty
744
745         if self.postcodes:
746             tpc = conn.t.postcode
747             pcs = self.postcodes.values
748
749             pc_near = sa.select(sa.func.min(tpc.c.geometry.ST_Distance(t.c.centroid)))\
750                         .where(tpc.c.postcode.in_(pcs))\
751                         .scalar_subquery()
752             penalty += sa.case((t.c.postcode.in_(pcs), 0.0),
753                                else_=sa.func.coalesce(pc_near, cast(SaColumn, 2.0)))
754
755         if details.viewbox is not None and not details.bounded_viewbox:
756             penalty += sa.case((t.c.geometry.intersects(VIEWBOX_PARAM, use_index=False), 0.0),
757                                (t.c.geometry.intersects(VIEWBOX2_PARAM, use_index=False), 0.5),
758                                else_=1.0)
759
760         if details.near is not None:
761             sql = sql.add_columns((-tsearch.c.centroid.ST_Distance(NEAR_PARAM))
762                                   .label('importance'))
763             sql = sql.order_by(sa.desc(sa.text('importance')))
764         else:
765             sql = sql.order_by(penalty - tsearch.c.importance)
766             sql = sql.add_columns(tsearch.c.importance)
767
768         sql = sql.add_columns(penalty.label('accuracy'))\
769                  .order_by(sa.text('accuracy'))
770
771         if self.housenumbers:
772             hnr_list = '|'.join(self.housenumbers.values)
773             inner = sql.where(sa.or_(tsearch.c.address_rank < 30,
774                                      sa.func.RegexpWord(hnr_list, t.c.housenumber)))\
775                        .subquery()
776
777             # Housenumbers from placex
778             thnr = conn.t.placex.alias('hnr')
779             pid_list = sa.func.ArrayAgg(thnr.c.place_id)
780             place_sql = sa.select(pid_list)\
781                           .where(thnr.c.parent_place_id == inner.c.place_id)\
782                           .where(sa.func.RegexpWord(hnr_list, thnr.c.housenumber))\
783                           .where(thnr.c.linked_place_id == None)\
784                           .where(thnr.c.indexed_status == 0)
785
786             if details.excluded:
787                 place_sql = place_sql.where(thnr.c.place_id.not_in(sa.bindparam('excluded')))
788             if self.qualifiers:
789                 place_sql = place_sql.where(self.qualifiers.sql_restrict(thnr))
790
791             numerals = [int(n) for n in self.housenumbers.values
792                         if n.isdigit() and len(n) < 8]
793             interpol_sql: SaColumn
794             tiger_sql: SaColumn
795             if numerals and \
796                (not self.qualifiers or ('place', 'house') in self.qualifiers.values):
797                 # Housenumbers from interpolations
798                 interpol_sql = _make_interpolation_subquery(conn.t.osmline, inner,
799                                                             numerals, details)
800                 # Housenumbers from Tiger
801                 tiger_sql = sa.case((inner.c.country_code == 'us',
802                                      _make_interpolation_subquery(conn.t.tiger, inner,
803                                                                   numerals, details)
804                                      ), else_=None)
805             else:
806                 interpol_sql = sa.null()
807                 tiger_sql = sa.null()
808
809             unsort = sa.select(inner, place_sql.scalar_subquery().label('placex_hnr'),
810                                interpol_sql.label('interpol_hnr'),
811                                tiger_sql.label('tiger_hnr')).subquery('unsort')
812             sql = sa.select(unsort)\
813                     .order_by(sa.case((unsort.c.placex_hnr != None, 1),
814                                       (unsort.c.interpol_hnr != None, 2),
815                                       (unsort.c.tiger_hnr != None, 3),
816                                       else_=4),
817                               unsort.c.accuracy)
818         else:
819             sql = sql.where(t.c.linked_place_id == None)\
820                      .where(t.c.indexed_status == 0)
821             if self.qualifiers:
822                 sql = sql.where(self.qualifiers.sql_restrict(t))
823             if details.layers is not None:
824                 sql = sql.where(_filter_by_layer(t, details.layers))
825
826         sql = sql.limit(LIMIT_PARAM)
827
828         results = nres.SearchResults()
829         for row in await conn.execute(sql, _details_to_bind_params(details)):
830             result = nres.create_from_placex_row(row, nres.SearchResult)
831             assert result
832             result.bbox = Bbox.from_wkb(row.bbox)
833             result.accuracy = row.accuracy
834             if self.housenumbers and row.rank_address < 30:
835                 if row.placex_hnr:
836                     subs = _get_placex_housenumbers(conn, row.placex_hnr, details)
837                 elif row.interpol_hnr:
838                     subs = _get_osmline(conn, row.interpol_hnr, numerals, details)
839                 elif row.tiger_hnr:
840                     subs = _get_tiger(conn, row.tiger_hnr, numerals, row.osm_id, details)
841                 else:
842                     subs = None
843
844                 if subs is not None:
845                     async for sub in subs:
846                         assert sub.housenumber
847                         sub.accuracy = result.accuracy
848                         if not any(nr in self.housenumbers.values
849                                    for nr in sub.housenumber.split(';')):
850                             sub.accuracy += 0.6
851                         results.append(sub)
852
853                 # Only add the street as a result, if it meets all other
854                 # filter conditions.
855                 if (not details.excluded or result.place_id not in details.excluded)\
856                    and (not self.qualifiers or result.category in self.qualifiers.values)\
857                    and result.rank_address >= details.min_rank:
858                     result.accuracy += 1.0  # penalty for missing housenumber
859                     results.append(result)
860             else:
861                 results.append(result)
862
863         return results