X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/4da4cbfe27a576ae011430b2de205c74435e241b..bc75e5626ec59523707b842022ecf0bdc159cf06:/src/nominatim_db/indexer/runners.py diff --git a/src/nominatim_db/indexer/runners.py b/src/nominatim_db/indexer/runners.py index 7b98e240..d7737c07 100644 --- a/src/nominatim_db/indexer/runners.py +++ b/src/nominatim_db/indexer/runners.py @@ -8,14 +8,14 @@ Mix-ins that provide the actual commands for the indexer for various indexing tasks. """ -from typing import Any, List -import functools +from typing import Any, Sequence -from psycopg2 import sql as pysql -import psycopg2.extras +from psycopg import sql as pysql +from psycopg.abc import Query +from psycopg.rows import DictRow +from psycopg.types.json import Json -from ..typing import Query, DictCursorResult, DictCursorResults, Protocol -from ..db.async_connection import DBConnection +from ..typing import Protocol from ..data.place_info import PlaceInfo from ..tokenizer.base import AbstractAnalyzer @@ -24,58 +24,48 @@ from ..tokenizer.base import AbstractAnalyzer def _mk_valuelist(template: str, num: int) -> pysql.Composed: return pysql.SQL(',').join([pysql.SQL(template)] * num) -def _analyze_place(place: DictCursorResult, analyzer: AbstractAnalyzer) -> psycopg2.extras.Json: - return psycopg2.extras.Json(analyzer.process_place(PlaceInfo(place))) +def _analyze_place(place: DictRow, analyzer: AbstractAnalyzer) -> Json: + return Json(analyzer.process_place(PlaceInfo(place))) class Runner(Protocol): def name(self) -> str: ... def sql_count_objects(self) -> Query: ... def sql_get_objects(self) -> Query: ... - def get_place_details(self, worker: DBConnection, - ids: DictCursorResults) -> DictCursorResults: ... - def index_places(self, worker: DBConnection, places: DictCursorResults) -> None: ... + def index_places_query(self, batch_size: int) -> Query: ... + def index_places_params(self, place: DictRow) -> Sequence[Any]: ... +SELECT_SQL = pysql.SQL("""SELECT place_id, extra.* + FROM (SELECT * FROM placex {}) as px, + LATERAL placex_indexing_prepare(px) as extra """) +UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)" + class AbstractPlacexRunner: """ Returns SQL commands for indexing of the placex table. """ - SELECT_SQL = pysql.SQL('SELECT place_id FROM placex ') - UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)" def __init__(self, rank: int, analyzer: AbstractAnalyzer) -> None: self.rank = rank self.analyzer = analyzer - @functools.lru_cache(maxsize=1) - def _index_sql(self, num_places: int) -> pysql.Composed: + def index_places_query(self, batch_size: int) -> Query: return pysql.SQL( """ UPDATE placex SET indexed_status = 0, address = v.addr, token_info = v.ti, name = v.name, linked_place_id = v.linked_place_id FROM (VALUES {}) as v(id, name, addr, linked_place_id, ti) WHERE place_id = v.id - """).format(_mk_valuelist(AbstractPlacexRunner.UPDATE_LINE, num_places)) - - - def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults: - worker.perform("""SELECT place_id, extra.* - FROM placex, LATERAL placex_indexing_prepare(placex) as extra - WHERE place_id IN %s""", - (tuple((p[0] for p in ids)), )) + """).format(_mk_valuelist(UPDATE_LINE, batch_size)) - return [] - - def index_places(self, worker: DBConnection, places: DictCursorResults) -> None: - values: List[Any] = [] - for place in places: - for field in ('place_id', 'name', 'address', 'linked_place_id'): - values.append(place[field]) - values.append(_analyze_place(place, self.analyzer)) - - worker.perform(self._index_sql(len(places)), values) + def index_places_params(self, place: DictRow) -> Sequence[Any]: + return (place['place_id'], + place['name'], + place['address'], + place['linked_place_id'], + _analyze_place(place, self.analyzer)) class RankRunner(AbstractPlacexRunner): @@ -91,10 +81,10 @@ class RankRunner(AbstractPlacexRunner): """).format(pysql.Literal(self.rank)) def sql_get_objects(self) -> pysql.Composed: - return self.SELECT_SQL + pysql.SQL( - """WHERE indexed_status > 0 and rank_address = {} - ORDER BY geometry_sector - """).format(pysql.Literal(self.rank)) + return SELECT_SQL.format(pysql.SQL( + """WHERE placex.indexed_status > 0 and placex.rank_address = {} + ORDER BY placex.geometry_sector + """).format(pysql.Literal(self.rank))) class BoundaryRunner(AbstractPlacexRunner): @@ -105,19 +95,19 @@ class BoundaryRunner(AbstractPlacexRunner): def name(self) -> str: return f"boundaries rank {self.rank}" - def sql_count_objects(self) -> pysql.Composed: + def sql_count_objects(self) -> Query: return pysql.SQL("""SELECT count(*) FROM placex WHERE indexed_status > 0 AND rank_search = {} AND class = 'boundary' and type = 'administrative' """).format(pysql.Literal(self.rank)) - def sql_get_objects(self) -> pysql.Composed: - return self.SELECT_SQL + pysql.SQL( - """WHERE indexed_status > 0 and rank_search = {} - and class = 'boundary' and type = 'administrative' - ORDER BY partition, admin_level - """).format(pysql.Literal(self.rank)) + def sql_get_objects(self) -> Query: + return SELECT_SQL.format(pysql.SQL( + """WHERE placex.indexed_status > 0 and placex.rank_search = {} + and placex.class = 'boundary' and placex.type = 'administrative' + ORDER BY placex.partition, placex.admin_level + """).format(pysql.Literal(self.rank))) class InterpolationRunner: @@ -132,40 +122,29 @@ class InterpolationRunner: def name(self) -> str: return "interpolation lines (location_property_osmline)" - def sql_count_objects(self) -> str: + def sql_count_objects(self) -> Query: return """SELECT count(*) FROM location_property_osmline WHERE indexed_status > 0""" - def sql_get_objects(self) -> str: - return """SELECT place_id + + def sql_get_objects(self) -> Query: + return """SELECT place_id, get_interpolation_address(address, osm_id) as address FROM location_property_osmline WHERE indexed_status > 0 ORDER BY geometry_sector""" - def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults: - worker.perform("""SELECT place_id, get_interpolation_address(address, osm_id) as address - FROM location_property_osmline WHERE place_id IN %s""", - (tuple((p[0] for p in ids)), )) - return [] - - - @functools.lru_cache(maxsize=1) - def _index_sql(self, num_places: int) -> pysql.Composed: + def index_places_query(self, batch_size: int) -> Query: return pysql.SQL("""UPDATE location_property_osmline SET indexed_status = 0, address = v.addr, token_info = v.ti FROM (VALUES {}) as v(id, addr, ti) WHERE place_id = v.id - """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", num_places)) - + """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", batch_size)) - def index_places(self, worker: DBConnection, places: DictCursorResults) -> None: - values: List[Any] = [] - for place in places: - values.extend((place[x] for x in ('place_id', 'address'))) - values.append(_analyze_place(place, self.analyzer)) - worker.perform(self._index_sql(len(places)), values) + def index_places_params(self, place: DictRow) -> Sequence[Any]: + return (place['place_id'], place['address'], + _analyze_place(place, self.analyzer)) @@ -177,20 +156,21 @@ class PostcodeRunner(Runner): return "postcodes (location_postcode)" - def sql_count_objects(self) -> str: + def sql_count_objects(self) -> Query: return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0' - def sql_get_objects(self) -> str: + def sql_get_objects(self) -> Query: return """SELECT place_id FROM location_postcode WHERE indexed_status > 0 ORDER BY country_code, postcode""" - def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults: - return ids + def index_places_query(self, batch_size: int) -> Query: + return pysql.SQL("""UPDATE location_postcode SET indexed_status = 0 + WHERE place_id IN ({})""")\ + .format(pysql.SQL(',').join((pysql.Placeholder() for _ in range(batch_size)))) + - def index_places(self, worker: DBConnection, places: DictCursorResults) -> None: - worker.perform(pysql.SQL("""UPDATE location_postcode SET indexed_status = 0 - WHERE place_id IN ({})""") - .format(pysql.SQL(',').join((pysql.Literal(i[0]) for i in places)))) + def index_places_params(self, place: DictRow) -> Sequence[Any]: + return (place['place_id'], )