Use the SQL formatting supplied with psycopg whenever the
query needs to be put together from snippets.
import functools
import psycopg2.extras
import functools
import psycopg2.extras
+from psycopg2 import sql as pysql
+def _mk_valuelist(template, num):
+ return pysql.SQL(',').join([pysql.SQL(template)] * num)
+
class AbstractPlacexRunner:
""" Returns SQL commands for indexing of the placex table.
"""
class AbstractPlacexRunner:
""" Returns SQL commands for indexing of the placex table.
"""
- SELECT_SQL = 'SELECT place_id FROM placex'
+ SELECT_SQL = pysql.SQL('SELECT place_id FROM placex ')
def __init__(self, rank, analyzer):
self.rank = rank
def __init__(self, rank, analyzer):
self.rank = rank
@staticmethod
@functools.lru_cache(maxsize=1)
def _index_sql(num_places):
@staticmethod
@functools.lru_cache(maxsize=1)
def _index_sql(num_places):
- return """ UPDATE placex
- SET indexed_status = 0, address = v.addr, token_info = v.ti
- FROM (VALUES {}) as v(id, addr, ti)
- WHERE place_id = v.id
- """.format(','.join(["(%s, %s::hstore, %s::jsonb)"] * num_places))
+ return pysql.SQL(
+ """ UPDATE placex
+ SET indexed_status = 0, address = v.addr, token_info = v.ti
+ FROM (VALUES {}) as v(id, addr, ti)
+ WHERE place_id = v.id
+ """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", num_places))
return "rank {}".format(self.rank)
def sql_count_objects(self):
return "rank {}".format(self.rank)
def sql_count_objects(self):
- return """SELECT count(*) FROM placex
- WHERE rank_address = {} and indexed_status > 0
- """.format(self.rank)
+ return pysql.SQL("""SELECT count(*) FROM placex
+ WHERE rank_address = {} and indexed_status > 0
+ """).format(pysql.Literal(self.rank))
def sql_get_objects(self):
def sql_get_objects(self):
- return """{} WHERE indexed_status > 0 and rank_address = {}
- ORDER BY geometry_sector
- """.format(self.SELECT_SQL, self.rank)
+ return self.SELECT_SQL + pysql.SQL(
+ """WHERE indexed_status > 0 and rank_address = {}
+ ORDER BY geometry_sector
+ """).format(pysql.Literal(self.rank))
class BoundaryRunner(AbstractPlacexRunner):
class BoundaryRunner(AbstractPlacexRunner):
return "boundaries rank {}".format(self.rank)
def sql_count_objects(self):
return "boundaries rank {}".format(self.rank)
def sql_count_objects(self):
- return """SELECT count(*) FROM placex
- WHERE indexed_status > 0
- AND rank_search = {}
- AND class = 'boundary' and type = 'administrative'
- """.format(self.rank)
+ return pysql.SQL("""SELECT count(*) FROM placex
+ WHERE indexed_status > 0
+ AND rank_search = {}
+ AND class = 'boundary' and type = 'administrative'
+ """).format(pysql.Literal(self.rank))
def sql_get_objects(self):
def sql_get_objects(self):
- return """{} WHERE indexed_status > 0 and rank_search = {}
- and class = 'boundary' and type = 'administrative'
- ORDER BY partition, admin_level
- """.format(self.SELECT_SQL, self.rank)
+ return self.SELECT_SQL + pysql.SQL(
+ """WHERE indexed_status > 0 and rank_search = {}
+ and class = 'boundary' and type = 'administrative'
+ ORDER BY partition, admin_level
+ """).format(pysql.Literal(self.rank))
class InterpolationRunner:
class InterpolationRunner:
@staticmethod
@functools.lru_cache(maxsize=1)
def _index_sql(num_places):
@staticmethod
@functools.lru_cache(maxsize=1)
def _index_sql(num_places):
- return """ UPDATE location_property_osmline
- SET indexed_status = 0, address = v.addr, token_info = v.ti
- FROM (VALUES {}) as v(id, addr, ti)
- WHERE place_id = v.id
- """.format(','.join(["(%s, %s::hstore, %s::jsonb)"] * num_places))
+ return pysql.SQL("""UPDATE location_property_osmline
+ SET indexed_status = 0, address = v.addr, token_info = v.ti
+ FROM (VALUES {}) as v(id, addr, ti)
+ WHERE place_id = v.id
+ """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", num_places))
def index_places(self, worker, places):
def index_places(self, worker, places):
@staticmethod
def index_places(worker, ids):
@staticmethod
def index_places(worker, ids):
- worker.perform(""" UPDATE location_postcode SET indexed_status = 0
- WHERE place_id IN ({})
- """.format(','.join((str(i[0]) for i in ids))))
+ worker.perform(pysql.SQL("""UPDATE location_postcode SET indexed_status = 0
+ WHERE place_id IN ({})""")
+ .format(pysql.SQL(',').join((pysql.Literal(i[0]) for i in ids))))
if word_tokens:
cur.execute("""INSERT INTO word (word_id, word_token, country_code,
search_name_count)
if word_tokens:
cur.execute("""INSERT INTO word (word_id, word_token, country_code,
search_name_count)
- (SELECT nextval('seq_word'), token, '{}', 0
+ (SELECT nextval('seq_word'), token, %s, 0
FROM unnest(%s) as token)
FROM unnest(%s) as token)
- """.format(country_code), (list(word_tokens),))
+ """, (country_code, list(word_tokens)))
def process_place(self, place):
def process_place(self, place):
import psutil
import psycopg2.extras
import psutil
import psycopg2.extras
+from psycopg2 import sql as pysql
from nominatim.db.connection import connect, get_pg_env
from nominatim.db import utils as db_utils
from nominatim.db.connection import connect, get_pg_env
from nominatim.db import utils as db_utils
-_COPY_COLUMNS = 'osm_type, osm_id, class, type, name, admin_level, address, extratags, geometry'
+_COPY_COLUMNS = pysql.SQL(',').join(map(pysql.Identifier,
+ ('osm_type', 'osm_id', 'class', 'type',
+ 'name', 'admin_level', 'address',
+ 'extratags', 'geometry')))
def load_data(dsn, threads):
def load_data(dsn, threads):
for imod in range(place_threads):
conn = DBConnection(dsn)
conn.connect()
for imod in range(place_threads):
conn = DBConnection(dsn)
conn.connect()
- conn.perform("""INSERT INTO placex ({0})
- SELECT {0} FROM place
- WHERE osm_id % {1} = {2}
- AND NOT (class='place' and (type='houses' or type='postcode'))
- AND ST_IsValid(geometry)
- """.format(_COPY_COLUMNS, place_threads, imod))
+ conn.perform(
+ pysql.SQL("""INSERT INTO placex ({columns})
+ SELECT {columns} FROM place
+ WHERE osm_id % {total} = {mod}
+ AND NOT (class='place' and (type='houses' or type='postcode'))
+ AND ST_IsValid(geometry)
+ """).format(columns=_COPY_COLUMNS,
+ total=pysql.Literal(place_threads),
+ mod=pysql.Literal(imod)))
sel.register(conn, selectors.EVENT_READ, conn)
# Address interpolations go into another table.
sel.register(conn, selectors.EVENT_READ, conn)
# Address interpolations go into another table.
"""
from pathlib import Path
"""
from pathlib import Path
+from psycopg2 import sql as pysql
+
UPDATE_TABLES = [
'address_levels',
'gb_postcode',
UPDATE_TABLES = [
'address_levels',
'gb_postcode',
""" Drop all tables only necessary for updating the database from
OSM replication data.
"""
""" Drop all tables only necessary for updating the database from
OSM replication data.
"""
-
- where = ' or '.join(["(tablename LIKE '{}')".format(t) for t in UPDATE_TABLES])
+ parts = (pysql.SQL("(tablename LIKE {})").format(pysql.Literal(t)) for t in UPDATE_TABLES)
with conn.cursor() as cur:
with conn.cursor() as cur:
- cur.execute("SELECT tablename FROM pg_tables WHERE " + where)
+ cur.execute(pysql.SQL("SELECT tablename FROM pg_tables WHERE ")
+ + pysql.SQL(' or ').join(parts))
tables = [r[0] for r in cur]
for table in tables:
tables = [r[0] for r in cur]
for table in tables:
rank_address SMALLINT)""".format(table))
cur.execute_values(pysql.SQL("INSERT INTO {} VALUES %s")
rank_address SMALLINT)""".format(table))
cur.execute_values(pysql.SQL("INSERT INTO {} VALUES %s")
- .format(pysql.Identifier(table)), rows)
+ .format(pysql.Identifier(table)), rows)
cur.execute('CREATE UNIQUE INDEX ON {} (country_code, class, type)'.format(table))
cur.execute('CREATE UNIQUE INDEX ON {} (country_code, class, type)'.format(table))