Also switches to our internal connect function which gives us
a cursor with a sclar() function.
import logging
import select
import logging
import select
from nominatim.indexer.progress import ProgressLogger
from nominatim.indexer import runners
from nominatim.db.async_connection import DBConnection
from nominatim.indexer.progress import ProgressLogger
from nominatim.indexer import runners
from nominatim.db.async_connection import DBConnection
+from nominatim.db.connection import connect
LOG = logging.getLogger()
LOG = logging.getLogger()
def __init__(self, dsn, num_threads):
self.dsn = dsn
self.num_threads = num_threads
def __init__(self, dsn, num_threads):
self.dsn = dsn
self.num_threads = num_threads
self.threads = []
def _setup_connections(self):
self.threads = []
def _setup_connections(self):
- self.conn = psycopg2.connect(self.dsn)
self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
def _close_connections(self):
self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
def _close_connections(self):
- if self.conn:
- self.conn.close()
- self.conn = None
-
for thread in self.threads:
thread.close()
self.threads = []
for thread in self.threads:
thread.close()
self.threads = []
database will be analysed at the appropriate places to
ensure that database statistics are updated.
"""
database will be analysed at the appropriate places to
ensure that database statistics are updated.
"""
- with psycopg2.connect(self.dsn) as conn:
+ with connect(self.dsn) as conn:
conn.autocommit = True
if analyse:
conn.autocommit = True
if analyse:
def update_status_table(self):
""" Update the status in the status table to 'indexed'.
"""
def update_status_table(self):
""" Update the status in the status table to 'indexed'.
"""
- conn = psycopg2.connect(self.dsn)
-
- try:
+ with connect(self.dsn) as conn:
with conn.cursor() as cur:
cur.execute('UPDATE import_status SET indexed = true')
conn.commit()
with conn.cursor() as cur:
cur.execute('UPDATE import_status SET indexed = true')
conn.commit()
- finally:
- conn.close()
def _index(self, runner, batch=1):
""" Index a single rank or table. `runner` describes the SQL to use
def _index(self, runner, batch=1):
""" Index a single rank or table. `runner` describes the SQL to use
"""
LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
"""
LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
- cur = self.conn.cursor()
- cur.execute(runner.sql_count_objects())
-
- total_tuples = cur.fetchone()[0]
- LOG.debug("Total number of rows: %i", total_tuples)
+ with connect(self.dsn) as conn:
+ with conn.cursor() as cur:
+ total_tuples = cur.scalar(runner.sql_count_objects())
+ LOG.debug("Total number of rows: %i", total_tuples)
- progress = ProgressLogger(runner.name(), total_tuples)
+ progress = ProgressLogger(runner.name(), total_tuples)
- if total_tuples > 0:
- cur = self.conn.cursor(name='places')
- cur.execute(runner.sql_get_objects())
+ if total_tuples > 0:
+ with conn.cursor(name='places') as cur:
+ cur.execute(runner.sql_get_objects())
- next_thread = self.find_free_thread()
- while True:
- places = [p[0] for p in cur.fetchmany(batch)]
- if not places:
- break
+ next_thread = self.find_free_thread()
+ while True:
+ places = [p[0] for p in cur.fetchmany(batch)]
+ if not places:
+ break
- LOG.debug("Processing places: %s", str(places))
- thread = next(next_thread)
+ LOG.debug("Processing places: %s", str(places))
+ thread = next(next_thread)
- thread.perform(runner.sql_index_place(places))
- progress.add(len(places))
+ thread.perform(runner.sql_index_place(places))
+ progress.add(len(places))
- for thread in self.threads:
- thread.wait()
+ for thread in self.threads:
+ thread.wait()