# Copyright (C) 2022 by the Nominatim developer community.
# For a full list of authors see the git log.
from pathlib import Path
+import importlib
import sys
import tempfile
self.api_db_done = False
self.website_dir = None
+ self.api_engine = None
+ if config['API_ENGINE'] != 'php':
+ if not hasattr(self, f"create_api_request_func_{config['API_ENGINE']}"):
+ raise RuntimeError(f"Unknown API engine '{config['API_ENGINE']}'")
+ self.api_engine = getattr(self, f"create_api_request_func_{config['API_ENGINE']}")()
+
def connect_database(self, dbname):
""" Return a connection to the database with the given name.
Uses configured host, user and port.
WHERE class='place' and type='houses'
and osm_type='W'
and ST_GeometryType(geometry) = 'ST_LineString'""")
+
+
+ def create_api_request_func_starlette(self):
+ import nominatim.server.starlette.server
+ from asgi_lifespan import LifespanManager
+ import httpx
+
+ async def _request(endpoint, params, project_dir, environ, http_headers):
+ app = nominatim.server.starlette.server.get_application(project_dir, environ)
+
+ async with LifespanManager(app):
+ async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
+ response = await client.get(f"/{endpoint}", params=params,
+ headers=http_headers)
+
+ return response.text, response.status_code
+
+ return _request
+
+
+ def create_api_request_func_falcon(self):
+ import nominatim.server.falcon.server
+ import falcon.testing
+
+ async def _request(endpoint, params, project_dir, environ, http_headers):
+ app = nominatim.server.falcon.server.get_application(project_dir, environ)
+
+ async with falcon.testing.ASGIConductor(app) as conductor:
+ response = await conductor.get(f"/{endpoint}", params=params,
+ headers=http_headers)
+
+ return response.text, response.status_code
+
+ return _request
+
+
+