""" Database helper functions for the indexer.
"""
import logging
+import select
+import time
+
import psycopg2
from psycopg2.extras import wait_select
return True
return False
+
+
+class WorkerPool:
+ """ A pool of asynchronous database connections.
+
+ The pool may be used as a context manager.
+ """
+ REOPEN_CONNECTIONS_AFTER = 100000
+
+ def __init__(self, dsn, pool_size):
+ self.threads = [DBConnection(dsn) for _ in range(pool_size)]
+ self.free_workers = self._yield_free_worker()
+ self.wait_time = 0
+
+
+ def finish_all(self):
+ """ Wait for all connection to finish.
+ """
+ for thread in self.threads:
+ while not thread.is_done():
+ thread.wait()
+
+ self.free_workers = self._yield_free_worker()
+
+ def close(self):
+ """ Close all connections and clear the pool.
+ """
+ for thread in self.threads:
+ thread.close()
+ self.threads = []
+ self.free_workers = None
+
+
+ def next_free_worker(self):
+ """ Get the next free connection.
+ """
+ return next(self.free_workers)
+
+
+ def _yield_free_worker(self):
+ ready = self.threads
+ command_stat = 0
+ while True:
+ for thread in ready:
+ if thread.is_done():
+ command_stat += 1
+ yield thread
+
+ if command_stat > self.REOPEN_CONNECTIONS_AFTER:
+ for thread in self.threads:
+ while not thread.is_done():
+ thread.wait()
+ thread.connect()
+ ready = self.threads
+ command_stat = 0
+ else:
+ tstart = time.time()
+ _, ready, _ = select.select([], self.threads, [])
+ self.wait_time += time.time() - tstart
+
+
+ def __enter__(self):
+ return self
+
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.finish_all()
+ self.close()
Main work horse for indexing (computing addresses) the database.
"""
import logging
-import select
import time
import psycopg2.extras
from nominatim.indexer.progress import ProgressLogger
from nominatim.indexer import runners
-from nominatim.db.async_connection import DBConnection
+from nominatim.db.async_connection import DBConnection, WorkerPool
from nominatim.db.connection import connect
LOG = logging.getLogger()
self.conn.wait()
self.close()
-class WorkerPool:
- """ A pool of asynchronous database connections.
-
- The pool may be used as a context manager.
- """
- REOPEN_CONNECTIONS_AFTER = 100000
-
- def __init__(self, dsn, pool_size):
- self.threads = [DBConnection(dsn) for _ in range(pool_size)]
- self.free_workers = self._yield_free_worker()
- self.wait_time = 0
-
-
- def finish_all(self):
- """ Wait for all connection to finish.
- """
- for thread in self.threads:
- while not thread.is_done():
- thread.wait()
-
- self.free_workers = self._yield_free_worker()
-
- def close(self):
- """ Close all connections and clear the pool.
- """
- for thread in self.threads:
- thread.close()
- self.threads = []
- self.free_workers = None
-
-
- def next_free_worker(self):
- """ Get the next free connection.
- """
- return next(self.free_workers)
-
-
- def _yield_free_worker(self):
- ready = self.threads
- command_stat = 0
- while True:
- for thread in ready:
- if thread.is_done():
- command_stat += 1
- yield thread
-
- if command_stat > self.REOPEN_CONNECTIONS_AFTER:
- for thread in self.threads:
- while not thread.is_done():
- thread.wait()
- thread.connect()
- ready = self.threads
- command_stat = 0
- else:
- tstart = time.time()
- _, ready, _ = select.select([], self.threads, [])
- self.wait_time += time.time() - tstart
-
-
- def __enter__(self):
- return self
-
-
- def __exit__(self, exc_type, exc_value, traceback):
- self.finish_all()
- self.close()
-
class Indexer:
""" Main indexing routine.