]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/indexer/runners.py
Merge pull request #3367 from lonvia/address-word-counts
[nominatim.git] / nominatim / indexer / runners.py
index 068d7d0fa63edba9a45fe7aef9195d06d164996a..bbadd282ec96c538baa7ff3926e75bd5bd6cbe69 100644 (file)
@@ -1,50 +1,79 @@
+# SPDX-License-Identifier: GPL-2.0-only
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2022 by the Nominatim developer community.
+# For a full list of authors see the git log.
 """
 Mix-ins that provide the actual commands for the indexer for various indexing
 tasks.
 """
+from typing import Any, List
 import functools
 
-import psycopg2.extras
 from psycopg2 import sql as pysql
+import psycopg2.extras
+
+from nominatim.data.place_info import PlaceInfo
+from nominatim.tokenizer.base import AbstractAnalyzer
+from nominatim.db.async_connection import DBConnection
+from nominatim.typing import Query, DictCursorResult, DictCursorResults, Protocol
 
 # pylint: disable=C0111
 
-def _mk_valuelist(template, num):
+def _mk_valuelist(template: str, num: int) -> pysql.Composed:
     return pysql.SQL(',').join([pysql.SQL(template)] * num)
 
+def _analyze_place(place: DictCursorResult, analyzer: AbstractAnalyzer) -> psycopg2.extras.Json:
+    return psycopg2.extras.Json(analyzer.process_place(PlaceInfo(place)))
+
+
+class Runner(Protocol):
+    def name(self) -> str: ...
+    def sql_count_objects(self) -> Query: ...
+    def sql_get_objects(self) -> Query: ...
+    def get_place_details(self, worker: DBConnection,
+                          ids: DictCursorResults) -> DictCursorResults: ...
+    def index_places(self, worker: DBConnection, places: DictCursorResults) -> None: ...
+
+
 class AbstractPlacexRunner:
     """ Returns SQL commands for indexing of the placex table.
     """
     SELECT_SQL = pysql.SQL('SELECT place_id FROM placex ')
+    UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)"
 
-    def __init__(self, rank, analyzer):
+    def __init__(self, rank: int, analyzer: AbstractAnalyzer) -> None:
         self.rank = rank
         self.analyzer = analyzer
 
 
-    @staticmethod
     @functools.lru_cache(maxsize=1)
-    def _index_sql(num_places):
+    def _index_sql(self, num_places: int) -> pysql.Composed:
         return pysql.SQL(
             """ UPDATE placex
-                SET indexed_status = 0, address = v.addr, token_info = v.ti
-                FROM (VALUES {}) as v(id, addr, ti)
+                SET indexed_status = 0, address = v.addr, token_info = v.ti,
+                    name = v.name, linked_place_id = v.linked_place_id
+                FROM (VALUES {}) as v(id, name, addr, linked_place_id, ti)
                 WHERE place_id = v.id
-            """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", num_places))
+            """).format(_mk_valuelist(AbstractPlacexRunner.UPDATE_LINE, num_places))
 
 
-    @staticmethod
-    def get_place_details(worker, ids):
-        worker.perform("""SELECT place_id, (placex_prepare_update(placex)).*
-                          FROM placex WHERE place_id IN %s""",
+    def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
+        worker.perform("""SELECT place_id, extra.*
+                          FROM placex, LATERAL placex_indexing_prepare(placex) as extra
+                          WHERE place_id IN %s""",
                        (tuple((p[0] for p in ids)), ))
 
+        return []
 
-    def index_places(self, worker, places):
-        values = []
+
+    def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
+        values: List[Any] = []
         for place in places:
-            values.extend((place[x] for x in ('place_id', 'address')))
-            values.append(psycopg2.extras.Json(self.analyzer.process_place(place)))
+            for field in ('place_id', 'name', 'address', 'linked_place_id'):
+                values.append(place[field])
+            values.append(_analyze_place(place, self.analyzer))
 
         worker.perform(self._index_sql(len(places)), values)
 
@@ -53,15 +82,15 @@ class RankRunner(AbstractPlacexRunner):
     """ Returns SQL commands for indexing one rank within the placex table.
     """
 
-    def name(self):
-        return "rank {}".format(self.rank)
+    def name(self) -> str:
+        return f"rank {self.rank}"
 
-    def sql_count_objects(self):
+    def sql_count_objects(self) -> pysql.Composed:
         return pysql.SQL("""SELECT count(*) FROM placex
                             WHERE rank_address = {} and indexed_status > 0
                          """).format(pysql.Literal(self.rank))
 
-    def sql_get_objects(self):
+    def sql_get_objects(self) -> pysql.Composed:
         return self.SELECT_SQL + pysql.SQL(
             """WHERE indexed_status > 0 and rank_address = {}
                ORDER BY geometry_sector
@@ -73,17 +102,17 @@ class BoundaryRunner(AbstractPlacexRunner):
         of a certain rank.
     """
 
-    def name(self):
-        return "boundaries rank {}".format(self.rank)
+    def name(self) -> str:
+        return f"boundaries rank {self.rank}"
 
-    def sql_count_objects(self):
+    def sql_count_objects(self) -> pysql.Composed:
         return pysql.SQL("""SELECT count(*) FROM placex
                             WHERE indexed_status > 0
                               AND rank_search = {}
                               AND class = 'boundary' and type = 'administrative'
                          """).format(pysql.Literal(self.rank))
 
-    def sql_get_objects(self):
+    def sql_get_objects(self) -> pysql.Composed:
         return self.SELECT_SQL + pysql.SQL(
             """WHERE indexed_status > 0 and rank_search = {}
                      and class = 'boundary' and type = 'administrative'
@@ -96,37 +125,33 @@ class InterpolationRunner:
         location_property_osmline.
     """
 
-    def __init__(self, analyzer):
+    def __init__(self, analyzer: AbstractAnalyzer) -> None:
         self.analyzer = analyzer
 
 
-    @staticmethod
-    def name():
+    def name(self) -> str:
         return "interpolation lines (location_property_osmline)"
 
-    @staticmethod
-    def sql_count_objects():
+    def sql_count_objects(self) -> str:
         return """SELECT count(*) FROM location_property_osmline
                   WHERE indexed_status > 0"""
 
-    @staticmethod
-    def sql_get_objects():
+    def sql_get_objects(self) -> str:
         return """SELECT place_id
                   FROM location_property_osmline
                   WHERE indexed_status > 0
                   ORDER BY geometry_sector"""
 
 
-    @staticmethod
-    def get_place_details(worker, ids):
+    def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
         worker.perform("""SELECT place_id, get_interpolation_address(address, osm_id) as address
                           FROM location_property_osmline WHERE place_id IN %s""",
                        (tuple((p[0] for p in ids)), ))
+        return []
 
 
-    @staticmethod
     @functools.lru_cache(maxsize=1)
-    def _index_sql(num_places):
+    def _index_sql(self, num_places: int) -> pysql.Composed:
         return pysql.SQL("""UPDATE location_property_osmline
                             SET indexed_status = 0, address = v.addr, token_info = v.ti
                             FROM (VALUES {}) as v(id, addr, ti)
@@ -134,36 +159,38 @@ class InterpolationRunner:
                          """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", num_places))
 
 
-    def index_places(self, worker, places):
-        values = []
+    def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
+        values: List[Any] = []
         for place in places:
             values.extend((place[x] for x in ('place_id', 'address')))
-            values.append(psycopg2.extras.Json(self.analyzer.process_place(place)))
+            values.append(_analyze_place(place, self.analyzer))
 
         worker.perform(self._index_sql(len(places)), values)
 
 
 
-class PostcodeRunner:
+class PostcodeRunner(Runner):
     """ Provides the SQL commands for indexing the location_postcode table.
     """
 
-    @staticmethod
-    def name():
+    def name(self) -> str:
         return "postcodes (location_postcode)"
 
-    @staticmethod
-    def sql_count_objects():
+
+    def sql_count_objects(self) -> str:
         return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
 
-    @staticmethod
-    def sql_get_objects():
+
+    def sql_get_objects(self) -> str:
         return """SELECT place_id FROM location_postcode
                   WHERE indexed_status > 0
                   ORDER BY country_code, postcode"""
 
-    @staticmethod
-    def index_places(worker, ids):
+
+    def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
+        return ids
+
+    def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
         worker.perform(pysql.SQL("""UPDATE location_postcode SET indexed_status = 0
                                     WHERE place_id IN ({})""")
-                       .format(pysql.SQL(',').join((pysql.Literal(i[0]) for i in ids))))
+                       .format(pysql.SQL(',').join((pysql.Literal(i[0]) for i in places))))