from nominatim.errors import UsageError
from nominatim.db.sqlalchemy_schema import SearchTables
from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
+import nominatim.db.sqlite_functions
from nominatim.config import Configuration
from nominatim.api.connection import SearchConnection
from nominatim.api.status import get_status, StatusResult
from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
-class NominatimAPIAsync:
+class NominatimAPIAsync: #pylint: disable=too-many-instance-attributes
""" The main frontend to the Nominatim database implements the
functions for lookup, forward and reverse geocoding using
asynchronous functions.
self.config = Configuration(project_dir, environ)
self.query_timeout = self.config.get_int('QUERY_TIMEOUT') \
if self.config.QUERY_TIMEOUT else None
+ self.reverse_restrict_to_country_area = self.config.get_bool('SEARCH_WITHIN_COUNTRIES')
self.server_version = 0
if sys.version_info >= (3, 10):
if self._engine:
return
- dsn = self.config.get_database_params()
- pool_size = self.config.get_int('API_POOL_SIZE')
-
- query = {k: v for k, v in dsn.items()
- if k not in ('user', 'password', 'dbname', 'host', 'port')}
-
- dburl = sa.engine.URL.create(
- f'postgresql+{PGCORE_LIB}',
- database=dsn.get('dbname'),
- username=dsn.get('user'), password=dsn.get('password'),
- host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
- query=query)
- engine = sa_asyncio.create_async_engine(dburl, future=True,
- max_overflow=0, pool_size=pool_size,
- echo=self.config.get_bool('DEBUG_SQL'))
-
- try:
- async with engine.begin() as conn:
- result = await conn.scalar(sa.text('SHOW server_version_num'))
- server_version = int(result)
- except (PGCORE_ERROR, sa.exc.OperationalError):
+ extra_args: Dict[str, Any] = {'future': True,
+ 'echo': self.config.get_bool('DEBUG_SQL')}
+
+ if self.config.get_int('API_POOL_SIZE') == 0:
+ extra_args['poolclass'] = sa.pool.NullPool
+ else:
+ extra_args['poolclass'] = sa.pool.AsyncAdaptedQueuePool
+ extra_args['max_overflow'] = 0
+ extra_args['pool_size'] = self.config.get_int('API_POOL_SIZE')
+
+
+ is_sqlite = self.config.DATABASE_DSN.startswith('sqlite:')
+
+ if is_sqlite:
+ params = dict((p.split('=', 1)
+ for p in self.config.DATABASE_DSN[7:].split(';')))
+ dburl = sa.engine.URL.create('sqlite+aiosqlite',
+ database=params.get('dbname'))
+
+ if not ('NOMINATIM_DATABASE_RW' in self.config.environ
+ and self.config.get_bool('DATABASE_RW')) \
+ and not Path(params.get('dbname', '')).is_file():
+ raise UsageError(f"SQlite database '{params.get('dbname')}' does not exist.")
+ else:
+ dsn = self.config.get_database_params()
+ query = {k: v for k, v in dsn.items()
+ if k not in ('user', 'password', 'dbname', 'host', 'port')}
+
+ dburl = sa.engine.URL.create(
+ f'postgresql+{PGCORE_LIB}',
+ database=dsn.get('dbname'),
+ username=dsn.get('user'),
+ password=dsn.get('password'),
+ host=dsn.get('host'),
+ port=int(dsn['port']) if 'port' in dsn else None,
+ query=query)
+
+ engine = sa_asyncio.create_async_engine(dburl, **extra_args)
+
+ if is_sqlite:
server_version = 0
- if server_version >= 110000:
@sa.event.listens_for(engine.sync_engine, "connect")
- def _on_connect(dbapi_con: Any, _: Any) -> None:
+ def _on_sqlite_connect(dbapi_con: Any, _: Any) -> None:
+ dbapi_con.run_async(lambda conn: conn.enable_load_extension(True))
+ nominatim.db.sqlite_functions.install_custom_functions(dbapi_con)
cursor = dbapi_con.cursor()
- cursor.execute("SET jit_above_cost TO '-1'")
- cursor.execute("SET max_parallel_workers_per_gather TO '0'")
- # Make sure that all connections get the new settings
- await self.close()
+ cursor.execute("SELECT load_extension('mod_spatialite')")
+ cursor.execute('SELECT SetDecimalPrecision(7)')
+ dbapi_con.run_async(lambda conn: conn.enable_load_extension(False))
+ else:
+ try:
+ async with engine.begin() as conn:
+ result = await conn.scalar(sa.text('SHOW server_version_num'))
+ server_version = int(result)
+ if server_version >= 110000:
+ await conn.execute(sa.text("SET jit_above_cost TO '-1'"))
+ await conn.execute(sa.text(
+ "SET max_parallel_workers_per_gather TO '0'"))
+ except (PGCORE_ERROR, sa.exc.OperationalError):
+ server_version = 0
+
+ if server_version >= 110000:
+ @sa.event.listens_for(engine.sync_engine, "connect")
+ def _on_connect(dbapi_con: Any, _: Any) -> None:
+ cursor = dbapi_con.cursor()
+ cursor.execute("SET jit_above_cost TO '-1'")
+ cursor.execute("SET max_parallel_workers_per_gather TO '0'")
self._property_cache['DB:server_version'] = server_version
- self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
+ self._tables = SearchTables(sa.MetaData()) # pylint: disable=no-member
self._engine = engine
conn.set_query_timeout(self.query_timeout)
if details.keywords:
await make_query_analyzer(conn)
- geocoder = ReverseGeocoder(conn, details)
+ geocoder = ReverseGeocoder(conn, details,
+ self.reverse_restrict_to_country_area)
return await geocoder.lookup(coord)