Mix-ins that provide the actual commands for the indexer for various indexing
tasks.
"""
-from typing import Any, List
-import functools
+from typing import Any, Sequence
-from psycopg2 import sql as pysql
-import psycopg2.extras
+from psycopg import sql as pysql
+from psycopg.abc import Query
+from psycopg.rows import DictRow
+from psycopg.types.json import Json
-from nominatim_core.typing import Query, DictCursorResult, DictCursorResults, Protocol
-from nominatim_core.db.async_connection import DBConnection
+from ..typing import Protocol
from ..data.place_info import PlaceInfo
from ..tokenizer.base import AbstractAnalyzer
-# pylint: disable=C0111
def _mk_valuelist(template: str, num: int) -> pysql.Composed:
return pysql.SQL(',').join([pysql.SQL(template)] * num)
-def _analyze_place(place: DictCursorResult, analyzer: AbstractAnalyzer) -> psycopg2.extras.Json:
- return psycopg2.extras.Json(analyzer.process_place(PlaceInfo(place)))
+
+def _analyze_place(place: DictRow, analyzer: AbstractAnalyzer) -> Json:
+ return Json(analyzer.process_place(PlaceInfo(place)))
class Runner(Protocol):
def name(self) -> str: ...
def sql_count_objects(self) -> Query: ...
def sql_get_objects(self) -> Query: ...
- def get_place_details(self, worker: DBConnection,
- ids: DictCursorResults) -> DictCursorResults: ...
- def index_places(self, worker: DBConnection, places: DictCursorResults) -> None: ...
+ def index_places_query(self, batch_size: int) -> Query: ...
+ def index_places_params(self, place: DictRow) -> Sequence[Any]: ...
+
+
+SELECT_SQL = pysql.SQL("""SELECT place_id, extra.*
+ FROM (SELECT * FROM placex {}) as px,
+ LATERAL placex_indexing_prepare(px) as extra """)
+UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)"
class AbstractPlacexRunner:
""" Returns SQL commands for indexing of the placex table.
"""
- SELECT_SQL = pysql.SQL('SELECT place_id FROM placex ')
- UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)"
def __init__(self, rank: int, analyzer: AbstractAnalyzer) -> None:
self.rank = rank
self.analyzer = analyzer
-
- @functools.lru_cache(maxsize=1)
- def _index_sql(self, num_places: int) -> pysql.Composed:
+ def index_places_query(self, batch_size: int) -> Query:
return pysql.SQL(
""" UPDATE placex
SET indexed_status = 0, address = v.addr, token_info = v.ti,
name = v.name, linked_place_id = v.linked_place_id
FROM (VALUES {}) as v(id, name, addr, linked_place_id, ti)
WHERE place_id = v.id
- """).format(_mk_valuelist(AbstractPlacexRunner.UPDATE_LINE, num_places))
-
-
- def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
- worker.perform("""SELECT place_id, extra.*
- FROM placex, LATERAL placex_indexing_prepare(placex) as extra
- WHERE place_id IN %s""",
- (tuple((p[0] for p in ids)), ))
+ """).format(_mk_valuelist(UPDATE_LINE, batch_size))
- return []
-
-
- def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
- values: List[Any] = []
- for place in places:
- for field in ('place_id', 'name', 'address', 'linked_place_id'):
- values.append(place[field])
- values.append(_analyze_place(place, self.analyzer))
-
- worker.perform(self._index_sql(len(places)), values)
+ def index_places_params(self, place: DictRow) -> Sequence[Any]:
+ return (place['place_id'],
+ place['name'],
+ place['address'],
+ place['linked_place_id'],
+ _analyze_place(place, self.analyzer))
class RankRunner(AbstractPlacexRunner):
""").format(pysql.Literal(self.rank))
def sql_get_objects(self) -> pysql.Composed:
- return self.SELECT_SQL + pysql.SQL(
- """WHERE indexed_status > 0 and rank_address = {}
- ORDER BY geometry_sector
- """).format(pysql.Literal(self.rank))
+ return SELECT_SQL.format(pysql.SQL(
+ """WHERE placex.indexed_status > 0 and placex.rank_address = {}
+ ORDER BY placex.geometry_sector
+ """).format(pysql.Literal(self.rank)))
class BoundaryRunner(AbstractPlacexRunner):
def name(self) -> str:
return f"boundaries rank {self.rank}"
- def sql_count_objects(self) -> pysql.Composed:
+ def sql_count_objects(self) -> Query:
return pysql.SQL("""SELECT count(*) FROM placex
WHERE indexed_status > 0
AND rank_search = {}
AND class = 'boundary' and type = 'administrative'
""").format(pysql.Literal(self.rank))
- def sql_get_objects(self) -> pysql.Composed:
- return self.SELECT_SQL + pysql.SQL(
- """WHERE indexed_status > 0 and rank_search = {}
- and class = 'boundary' and type = 'administrative'
- ORDER BY partition, admin_level
- """).format(pysql.Literal(self.rank))
+ def sql_get_objects(self) -> Query:
+ return SELECT_SQL.format(pysql.SQL(
+ """WHERE placex.indexed_status > 0 and placex.rank_search = {}
+ and placex.class = 'boundary' and placex.type = 'administrative'
+ ORDER BY placex.partition, placex.admin_level
+ """).format(pysql.Literal(self.rank)))
class InterpolationRunner:
def __init__(self, analyzer: AbstractAnalyzer) -> None:
self.analyzer = analyzer
-
def name(self) -> str:
return "interpolation lines (location_property_osmline)"
- def sql_count_objects(self) -> str:
+ def sql_count_objects(self) -> Query:
return """SELECT count(*) FROM location_property_osmline
WHERE indexed_status > 0"""
- def sql_get_objects(self) -> str:
- return """SELECT place_id
+ def sql_get_objects(self) -> Query:
+ return """SELECT place_id, get_interpolation_address(address, osm_id) as address
FROM location_property_osmline
WHERE indexed_status > 0
ORDER BY geometry_sector"""
-
- def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
- worker.perform("""SELECT place_id, get_interpolation_address(address, osm_id) as address
- FROM location_property_osmline WHERE place_id IN %s""",
- (tuple((p[0] for p in ids)), ))
- return []
-
-
- @functools.lru_cache(maxsize=1)
- def _index_sql(self, num_places: int) -> pysql.Composed:
+ def index_places_query(self, batch_size: int) -> Query:
return pysql.SQL("""UPDATE location_property_osmline
SET indexed_status = 0, address = v.addr, token_info = v.ti
FROM (VALUES {}) as v(id, addr, ti)
WHERE place_id = v.id
- """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", num_places))
-
-
- def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
- values: List[Any] = []
- for place in places:
- values.extend((place[x] for x in ('place_id', 'address')))
- values.append(_analyze_place(place, self.analyzer))
-
- worker.perform(self._index_sql(len(places)), values)
+ """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", batch_size))
+ def index_places_params(self, place: DictRow) -> Sequence[Any]:
+ return (place['place_id'], place['address'],
+ _analyze_place(place, self.analyzer))
class PostcodeRunner(Runner):
def name(self) -> str:
return "postcodes (location_postcode)"
-
- def sql_count_objects(self) -> str:
+ def sql_count_objects(self) -> Query:
return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
-
- def sql_get_objects(self) -> str:
+ def sql_get_objects(self) -> Query:
return """SELECT place_id FROM location_postcode
WHERE indexed_status > 0
ORDER BY country_code, postcode"""
+ def index_places_query(self, batch_size: int) -> Query:
+ return pysql.SQL("""UPDATE location_postcode SET indexed_status = 0
+ WHERE place_id IN ({})""")\
+ .format(pysql.SQL(',').join((pysql.Placeholder() for _ in range(batch_size))))
- def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
- return ids
-
- def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
- worker.perform(pysql.SQL("""UPDATE location_postcode SET indexed_status = 0
- WHERE place_id IN ({})""")
- .format(pysql.SQL(',').join((pysql.Literal(i[0]) for i in places))))
+ def index_places_params(self, place: DictRow) -> Sequence[Any]:
+ return (place['place_id'], )