]> git.openstreetmap.org Git - nominatim.git/blobdiff - src/nominatim_db/indexer/runners.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / src / nominatim_db / indexer / runners.py
index 7b98e2401beab3b820941d76218b1137f8afb97f..d7737c07230c3d7df4b1579c986ba68d046333df 100644 (file)
@@ -8,14 +8,14 @@
 Mix-ins that provide the actual commands for the indexer for various indexing
 tasks.
 """
-from typing import Any, List
-import functools
+from typing import Any, Sequence
 
-from psycopg2 import sql as pysql
-import psycopg2.extras
+from psycopg import sql as pysql
+from psycopg.abc import Query
+from psycopg.rows import DictRow
+from psycopg.types.json import Json
 
-from ..typing import Query, DictCursorResult, DictCursorResults, Protocol
-from ..db.async_connection import DBConnection
+from ..typing import Protocol
 from ..data.place_info import PlaceInfo
 from ..tokenizer.base import AbstractAnalyzer
 
@@ -24,58 +24,48 @@ from ..tokenizer.base import AbstractAnalyzer
 def _mk_valuelist(template: str, num: int) -> pysql.Composed:
     return pysql.SQL(',').join([pysql.SQL(template)] * num)
 
-def _analyze_place(place: DictCursorResult, analyzer: AbstractAnalyzer) -> psycopg2.extras.Json:
-    return psycopg2.extras.Json(analyzer.process_place(PlaceInfo(place)))
+def _analyze_place(place: DictRow, analyzer: AbstractAnalyzer) -> Json:
+    return Json(analyzer.process_place(PlaceInfo(place)))
 
 
 class Runner(Protocol):
     def name(self) -> str: ...
     def sql_count_objects(self) -> Query: ...
     def sql_get_objects(self) -> Query: ...
-    def get_place_details(self, worker: DBConnection,
-                          ids: DictCursorResults) -> DictCursorResults: ...
-    def index_places(self, worker: DBConnection, places: DictCursorResults) -> None: ...
+    def index_places_query(self, batch_size: int) -> Query: ...
+    def index_places_params(self, place: DictRow) -> Sequence[Any]: ...
 
 
+SELECT_SQL = pysql.SQL("""SELECT place_id, extra.*
+                          FROM (SELECT * FROM placex {}) as px,
+                          LATERAL placex_indexing_prepare(px) as extra """)
+UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)"
+
 class AbstractPlacexRunner:
     """ Returns SQL commands for indexing of the placex table.
     """
-    SELECT_SQL = pysql.SQL('SELECT place_id FROM placex ')
-    UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)"
 
     def __init__(self, rank: int, analyzer: AbstractAnalyzer) -> None:
         self.rank = rank
         self.analyzer = analyzer
 
 
-    @functools.lru_cache(maxsize=1)
-    def _index_sql(self, num_places: int) -> pysql.Composed:
+    def index_places_query(self, batch_size: int) -> Query:
         return pysql.SQL(
             """ UPDATE placex
                 SET indexed_status = 0, address = v.addr, token_info = v.ti,
                     name = v.name, linked_place_id = v.linked_place_id
                 FROM (VALUES {}) as v(id, name, addr, linked_place_id, ti)
                 WHERE place_id = v.id
-            """).format(_mk_valuelist(AbstractPlacexRunner.UPDATE_LINE, num_places))
-
-
-    def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
-        worker.perform("""SELECT place_id, extra.*
-                          FROM placex, LATERAL placex_indexing_prepare(placex) as extra
-                          WHERE place_id IN %s""",
-                       (tuple((p[0] for p in ids)), ))
+            """).format(_mk_valuelist(UPDATE_LINE, batch_size))
 
-        return []
 
-
-    def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
-        values: List[Any] = []
-        for place in places:
-            for field in ('place_id', 'name', 'address', 'linked_place_id'):
-                values.append(place[field])
-            values.append(_analyze_place(place, self.analyzer))
-
-        worker.perform(self._index_sql(len(places)), values)
+    def index_places_params(self, place: DictRow) -> Sequence[Any]:
+        return (place['place_id'],
+                place['name'],
+                place['address'],
+                place['linked_place_id'],
+                _analyze_place(place, self.analyzer))
 
 
 class RankRunner(AbstractPlacexRunner):
@@ -91,10 +81,10 @@ class RankRunner(AbstractPlacexRunner):
                          """).format(pysql.Literal(self.rank))
 
     def sql_get_objects(self) -> pysql.Composed:
-        return self.SELECT_SQL + pysql.SQL(
-            """WHERE indexed_status > 0 and rank_address = {}
-               ORDER BY geometry_sector
-            """).format(pysql.Literal(self.rank))
+        return SELECT_SQL.format(pysql.SQL(
+                """WHERE placex.indexed_status > 0 and placex.rank_address = {}
+                   ORDER BY placex.geometry_sector
+                """).format(pysql.Literal(self.rank)))
 
 
 class BoundaryRunner(AbstractPlacexRunner):
@@ -105,19 +95,19 @@ class BoundaryRunner(AbstractPlacexRunner):
     def name(self) -> str:
         return f"boundaries rank {self.rank}"
 
-    def sql_count_objects(self) -> pysql.Composed:
+    def sql_count_objects(self) -> Query:
         return pysql.SQL("""SELECT count(*) FROM placex
                             WHERE indexed_status > 0
                               AND rank_search = {}
                               AND class = 'boundary' and type = 'administrative'
                          """).format(pysql.Literal(self.rank))
 
-    def sql_get_objects(self) -> pysql.Composed:
-        return self.SELECT_SQL + pysql.SQL(
-            """WHERE indexed_status > 0 and rank_search = {}
-                     and class = 'boundary' and type = 'administrative'
-               ORDER BY partition, admin_level
-            """).format(pysql.Literal(self.rank))
+    def sql_get_objects(self) -> Query:
+        return SELECT_SQL.format(pysql.SQL(
+                """WHERE placex.indexed_status > 0 and placex.rank_search = {}
+                         and placex.class = 'boundary' and placex.type = 'administrative'
+                   ORDER BY placex.partition, placex.admin_level
+                """).format(pysql.Literal(self.rank)))
 
 
 class InterpolationRunner:
@@ -132,40 +122,29 @@ class InterpolationRunner:
     def name(self) -> str:
         return "interpolation lines (location_property_osmline)"
 
-    def sql_count_objects(self) -> str:
+    def sql_count_objects(self) -> Query:
         return """SELECT count(*) FROM location_property_osmline
                   WHERE indexed_status > 0"""
 
-    def sql_get_objects(self) -> str:
-        return """SELECT place_id
+
+    def sql_get_objects(self) -> Query:
+        return """SELECT place_id, get_interpolation_address(address, osm_id) as address
                   FROM location_property_osmline
                   WHERE indexed_status > 0
                   ORDER BY geometry_sector"""
 
 
-    def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
-        worker.perform("""SELECT place_id, get_interpolation_address(address, osm_id) as address
-                          FROM location_property_osmline WHERE place_id IN %s""",
-                       (tuple((p[0] for p in ids)), ))
-        return []
-
-
-    @functools.lru_cache(maxsize=1)
-    def _index_sql(self, num_places: int) -> pysql.Composed:
+    def index_places_query(self, batch_size: int) -> Query:
         return pysql.SQL("""UPDATE location_property_osmline
                             SET indexed_status = 0, address = v.addr, token_info = v.ti
                             FROM (VALUES {}) as v(id, addr, ti)
                             WHERE place_id = v.id
-                         """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", num_places))
-
+                         """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", batch_size))
 
-    def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
-        values: List[Any] = []
-        for place in places:
-            values.extend((place[x] for x in ('place_id', 'address')))
-            values.append(_analyze_place(place, self.analyzer))
 
-        worker.perform(self._index_sql(len(places)), values)
+    def index_places_params(self, place: DictRow) -> Sequence[Any]:
+        return (place['place_id'], place['address'],
+                _analyze_place(place, self.analyzer))
 
 
 
@@ -177,20 +156,21 @@ class PostcodeRunner(Runner):
         return "postcodes (location_postcode)"
 
 
-    def sql_count_objects(self) -> str:
+    def sql_count_objects(self) -> Query:
         return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
 
 
-    def sql_get_objects(self) -> str:
+    def sql_get_objects(self) -> Query:
         return """SELECT place_id FROM location_postcode
                   WHERE indexed_status > 0
                   ORDER BY country_code, postcode"""
 
 
-    def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
-        return ids
+    def index_places_query(self, batch_size: int) -> Query:
+        return pysql.SQL("""UPDATE location_postcode SET indexed_status = 0
+                                    WHERE place_id IN ({})""")\
+                    .format(pysql.SQL(',').join((pysql.Placeholder() for _ in range(batch_size))))
+
 
-    def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
-        worker.perform(pysql.SQL("""UPDATE location_postcode SET indexed_status = 0
-                                    WHERE place_id IN ({})""")
-                       .format(pysql.SQL(',').join((pysql.Literal(i[0]) for i in places))))
+    def index_places_params(self, place: DictRow) -> Sequence[Any]:
+        return (place['place_id'], )