X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/199532c802fafaa13ae77d1b24cf969e24e8a9d0..2dafc4cf4fc46ea4be293a458d565fbff645ac28:/nominatim/indexer/runners.py diff --git a/nominatim/indexer/runners.py b/nominatim/indexer/runners.py index 29261ee5..bbadd282 100644 --- a/nominatim/indexer/runners.py +++ b/nominatim/indexer/runners.py @@ -1,31 +1,55 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# This file is part of Nominatim. (https://nominatim.org) +# +# Copyright (C) 2022 by the Nominatim developer community. +# For a full list of authors see the git log. """ Mix-ins that provide the actual commands for the indexer for various indexing tasks. """ +from typing import Any, List import functools -import psycopg2.extras from psycopg2 import sql as pysql +import psycopg2.extras + +from nominatim.data.place_info import PlaceInfo +from nominatim.tokenizer.base import AbstractAnalyzer +from nominatim.db.async_connection import DBConnection +from nominatim.typing import Query, DictCursorResult, DictCursorResults, Protocol # pylint: disable=C0111 -def _mk_valuelist(template, num): +def _mk_valuelist(template: str, num: int) -> pysql.Composed: return pysql.SQL(',').join([pysql.SQL(template)] * num) +def _analyze_place(place: DictCursorResult, analyzer: AbstractAnalyzer) -> psycopg2.extras.Json: + return psycopg2.extras.Json(analyzer.process_place(PlaceInfo(place))) + + +class Runner(Protocol): + def name(self) -> str: ... + def sql_count_objects(self) -> Query: ... + def sql_get_objects(self) -> Query: ... + def get_place_details(self, worker: DBConnection, + ids: DictCursorResults) -> DictCursorResults: ... + def index_places(self, worker: DBConnection, places: DictCursorResults) -> None: ... + + class AbstractPlacexRunner: """ Returns SQL commands for indexing of the placex table. """ SELECT_SQL = pysql.SQL('SELECT place_id FROM placex ') UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)" - def __init__(self, rank, analyzer): + def __init__(self, rank: int, analyzer: AbstractAnalyzer) -> None: self.rank = rank self.analyzer = analyzer - @staticmethod @functools.lru_cache(maxsize=1) - def _index_sql(num_places): + def _index_sql(self, num_places: int) -> pysql.Composed: return pysql.SQL( """ UPDATE placex SET indexed_status = 0, address = v.addr, token_info = v.ti, @@ -35,19 +59,21 @@ class AbstractPlacexRunner: """).format(_mk_valuelist(AbstractPlacexRunner.UPDATE_LINE, num_places)) - @staticmethod - def get_place_details(worker, ids): - worker.perform("""SELECT place_id, (placex_prepare_update(placex)).* - FROM placex WHERE place_id IN %s""", + def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults: + worker.perform("""SELECT place_id, extra.* + FROM placex, LATERAL placex_indexing_prepare(placex) as extra + WHERE place_id IN %s""", (tuple((p[0] for p in ids)), )) + return [] + - def index_places(self, worker, places): - values = [] + def index_places(self, worker: DBConnection, places: DictCursorResults) -> None: + values: List[Any] = [] for place in places: for field in ('place_id', 'name', 'address', 'linked_place_id'): values.append(place[field]) - values.append(psycopg2.extras.Json(self.analyzer.process_place(place))) + values.append(_analyze_place(place, self.analyzer)) worker.perform(self._index_sql(len(places)), values) @@ -56,15 +82,15 @@ class RankRunner(AbstractPlacexRunner): """ Returns SQL commands for indexing one rank within the placex table. """ - def name(self): - return "rank {}".format(self.rank) + def name(self) -> str: + return f"rank {self.rank}" - def sql_count_objects(self): + def sql_count_objects(self) -> pysql.Composed: return pysql.SQL("""SELECT count(*) FROM placex WHERE rank_address = {} and indexed_status > 0 """).format(pysql.Literal(self.rank)) - def sql_get_objects(self): + def sql_get_objects(self) -> pysql.Composed: return self.SELECT_SQL + pysql.SQL( """WHERE indexed_status > 0 and rank_address = {} ORDER BY geometry_sector @@ -76,17 +102,17 @@ class BoundaryRunner(AbstractPlacexRunner): of a certain rank. """ - def name(self): - return "boundaries rank {}".format(self.rank) + def name(self) -> str: + return f"boundaries rank {self.rank}" - def sql_count_objects(self): + def sql_count_objects(self) -> pysql.Composed: return pysql.SQL("""SELECT count(*) FROM placex WHERE indexed_status > 0 AND rank_search = {} AND class = 'boundary' and type = 'administrative' """).format(pysql.Literal(self.rank)) - def sql_get_objects(self): + def sql_get_objects(self) -> pysql.Composed: return self.SELECT_SQL + pysql.SQL( """WHERE indexed_status > 0 and rank_search = {} and class = 'boundary' and type = 'administrative' @@ -99,37 +125,33 @@ class InterpolationRunner: location_property_osmline. """ - def __init__(self, analyzer): + def __init__(self, analyzer: AbstractAnalyzer) -> None: self.analyzer = analyzer - @staticmethod - def name(): + def name(self) -> str: return "interpolation lines (location_property_osmline)" - @staticmethod - def sql_count_objects(): + def sql_count_objects(self) -> str: return """SELECT count(*) FROM location_property_osmline WHERE indexed_status > 0""" - @staticmethod - def sql_get_objects(): + def sql_get_objects(self) -> str: return """SELECT place_id FROM location_property_osmline WHERE indexed_status > 0 ORDER BY geometry_sector""" - @staticmethod - def get_place_details(worker, ids): + def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults: worker.perform("""SELECT place_id, get_interpolation_address(address, osm_id) as address FROM location_property_osmline WHERE place_id IN %s""", (tuple((p[0] for p in ids)), )) + return [] - @staticmethod @functools.lru_cache(maxsize=1) - def _index_sql(num_places): + def _index_sql(self, num_places: int) -> pysql.Composed: return pysql.SQL("""UPDATE location_property_osmline SET indexed_status = 0, address = v.addr, token_info = v.ti FROM (VALUES {}) as v(id, addr, ti) @@ -137,36 +159,38 @@ class InterpolationRunner: """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", num_places)) - def index_places(self, worker, places): - values = [] + def index_places(self, worker: DBConnection, places: DictCursorResults) -> None: + values: List[Any] = [] for place in places: values.extend((place[x] for x in ('place_id', 'address'))) - values.append(psycopg2.extras.Json(self.analyzer.process_place(place))) + values.append(_analyze_place(place, self.analyzer)) worker.perform(self._index_sql(len(places)), values) -class PostcodeRunner: +class PostcodeRunner(Runner): """ Provides the SQL commands for indexing the location_postcode table. """ - @staticmethod - def name(): + def name(self) -> str: return "postcodes (location_postcode)" - @staticmethod - def sql_count_objects(): + + def sql_count_objects(self) -> str: return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0' - @staticmethod - def sql_get_objects(): + + def sql_get_objects(self) -> str: return """SELECT place_id FROM location_postcode WHERE indexed_status > 0 ORDER BY country_code, postcode""" - @staticmethod - def index_places(worker, ids): + + def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults: + return ids + + def index_places(self, worker: DBConnection, places: DictCursorResults) -> None: worker.perform(pysql.SQL("""UPDATE location_postcode SET indexed_status = 0 WHERE place_id IN ({})""") - .format(pysql.SQL(',').join((pysql.Literal(i[0]) for i in ids)))) + .format(pysql.SQL(',').join((pysql.Literal(i[0]) for i in places))))