from textwrap import dedent
from icu import Transliterator
-import psycopg2
-import psycopg2.extras
+import psycopg
+from psycopg import sql as pysql
from ..errors import UsageError
-from ..db.connection import connect, Connection
+from ..db.connection import connect, Connection, drop_tables, table_exists,\
+ execute_scalar, register_hstore
from ..config import Configuration
from ..db import properties
from ..db import utils as db_utils
"""
with conn.cursor() as cur:
try:
- cur.execute("""CREATE FUNCTION nominatim_test_import_func(text)
- RETURNS text AS %s, 'transliteration'
- LANGUAGE c IMMUTABLE STRICT;
- DROP FUNCTION nominatim_test_import_func(text)
- """, (f'{module_dir}/nominatim.so', ))
- except psycopg2.DatabaseError as err:
+ cur.execute(pysql.SQL("""CREATE FUNCTION nominatim_test_import_func(text)
+ RETURNS text AS {}, 'transliteration'
+ LANGUAGE c IMMUTABLE STRICT;
+ DROP FUNCTION nominatim_test_import_func(text)
+ """).format(pysql.Literal(f'{module_dir}/nominatim.so')))
+ except psycopg.DatabaseError as err:
LOG.fatal("Error accessing database module: %s", err)
raise UsageError("Database module cannot be accessed.") from err
* Can nominatim.so be accessed by the database user?
"""
with connect(self.dsn) as conn:
- with conn.cursor() as cur:
- try:
- out = cur.scalar("SELECT make_standard_name('a')")
- except psycopg2.Error as err:
- return hint.format(error=str(err))
+ try:
+ out = execute_scalar(conn, "SELECT make_standard_name('a')")
+ except psycopg.Error as err:
+ return hint.format(error=str(err))
if out != 'a':
return hint.format(error='Unexpected result for make_standard_name()')
""" Recompute the frequency of full words.
"""
with connect(self.dsn) as conn:
- if conn.table_exists('search_name'):
+ if table_exists(conn, 'search_name'):
+ drop_tables(conn, "word_frequencies")
with conn.cursor() as cur:
- cur.drop_table("word_frequencies")
LOG.info("Computing word frequencies")
cur.execute("""CREATE TEMP TABLE word_frequencies AS
SELECT unnest(name_vector) as id, count(*)
cur.execute("""UPDATE word SET search_name_count = count
FROM word_frequencies
WHERE word_token like ' %' and word_id = id""")
- cur.drop_table("word_frequencies")
+ drop_tables(conn, "word_frequencies")
conn.commit()
"""
def __init__(self, dsn: str, normalizer: Any):
- self.conn: Optional[Connection] = connect(dsn).connection
+ self.conn: Optional[Connection] = connect(dsn)
self.conn.autocommit = True
self.normalizer = normalizer
- psycopg2.extras.register_hstore(self.conn)
+ register_hstore(self.conn)
self._cache = _TokenCache(self.conn)
""", (to_delete, ))
if to_add:
cur.execute("""SELECT count(create_postcode_id(pc))
- FROM unnest(%s) as pc
+ FROM unnest(%s::text[]) as pc
""", (to_add, ))
with self.conn.cursor() as cur:
# Get the old phrases.
existing_phrases = set()
- cur.execute("""SELECT word, class, type, operator FROM word
+ cur.execute("""SELECT word, class as cls, type, operator FROM word
WHERE class != 'place'
OR (type != 'house' AND type != 'postcode')""")
for label, cls, typ, oper in cur:
to_delete = existing_phrases - norm_phrases
if to_add:
- cur.execute_values(
+ cur.executemany(
""" INSERT INTO word (word_id, word_token, word, class, type,
search_name_count, operator)
(SELECT nextval('seq_word'), ' ' || make_standard_name(name), name,
class, type, 0,
CASE WHEN op in ('in', 'near') THEN op ELSE null END
- FROM (VALUES %s) as v(name, class, type, op))""",
+ FROM (VALUES (%s, %s, %s, %s)) as v(name, class, type, op))""",
to_add)
if to_delete and should_replace:
- cur.execute_values(
- """ DELETE FROM word USING (VALUES %s) as v(name, in_class, in_type, op)
+ cur.executemany(
+ """ DELETE FROM word
+ USING (VALUES (%s, %s, %s, %s)) as v(name, in_class, in_type, op)
WHERE word = name and class = in_class and type = in_type
and ((op = '-' and operator is null) or op = operator)""",
to_delete)
"""INSERT INTO word (word_id, word_token, country_code)
(SELECT nextval('seq_word'), lookup_token, %s
FROM (SELECT DISTINCT ' ' || make_standard_name(n) as lookup_token
- FROM unnest(%s)n) y
+ FROM unnest(%s::TEXT[])n) y
WHERE NOT EXISTS(SELECT * FROM word
WHERE word_token = lookup_token and country_code = %s))
""", (country_code, list(names.values()), country_code))
def add_names(self, conn: Connection, names: Mapping[str, str]) -> None:
""" Add token information for the names of the place.
"""
- with conn.cursor() as cur:
- # Create the token IDs for all names.
- self.data['names'] = cur.scalar("SELECT make_keywords(%s)::text",
+ # Create the token IDs for all names.
+ self.data['names'] = execute_scalar(conn, "SELECT make_keywords(%s)::text",
(names, ))
""" Add addr:street match terms.
"""
def _get_street(name: str) -> Optional[str]:
- with conn.cursor() as cur:
- return cast(Optional[str],
- cur.scalar("SELECT word_ids_from_name(%s)::text", (name, )))
+ return cast(Optional[str],
+ execute_scalar(conn, "SELECT word_ids_from_name(%s)::text", (name, )))
tokens = self.cache.streets.get(street, _get_street)
self.data['street'] = tokens or '{}'