from ... import logging as loglib
from ..asgi_adaptor import ASGIAdaptor, EndpointFunc
+
class HTTPNominatimError(Exception):
""" A special exception class for errors raised during processing.
"""
self.content_type = content_type
-async def nominatim_error_handler(req: Request, resp: Response, #pylint: disable=unused-argument
+async def nominatim_error_handler(req: Request, resp: Response,
exception: HTTPNominatimError,
_: Any) -> None:
""" Special error handler that passes message and content type as
resp.content_type = exception.content_type
-async def timeout_error_handler(req: Request, resp: Response, #pylint: disable=unused-argument
- exception: TimeoutError, #pylint: disable=unused-argument
+async def timeout_error_handler(req: Request, resp: Response,
+ exception: TimeoutError,
_: Any) -> None:
""" Special error handler that passes message and content type as
per exception info.
self._config = config
self._formatter = formatter
-
def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
return self.request.get_param(name, default=default)
-
def get_header(self, name: str, default: Optional[str] = None) -> Optional[str]:
return self.request.get_header(name, default=default)
-
def error(self, msg: str, status: int = 400) -> HTTPNominatimError:
return HTTPNominatimError(msg, status, self.content_type)
-
def create_response(self, status: int, output: str, num_results: int) -> None:
self.response.context.num_results = num_results
self.response.status = status
self.response.text = output
self.response.content_type = self.content_type
-
def base_uri(self) -> str:
return self.request.forwarded_prefix
self.api = api
self.formatter = formatter
-
async def on_get(self, req: Request, resp: Response) -> None:
""" Implementation of the endpoint.
"""
"""
def __init__(self, file_name: str):
- self.fd = open(file_name, 'a', buffering=1, encoding='utf8') # pylint: disable=R1732
-
+ self.fd = open(file_name, 'a', buffering=1, encoding='utf8')
async def process_request(self, req: Request, _: Response) -> None:
""" Callback before the request starts timing.
"""
req.context.start = dt.datetime.now(tz=dt.timezone.utc)
-
async def process_response(self, req: Request, resp: Response,
resource: Optional[EndpointWrapper],
req_succeeded: bool) -> None:
writes logs for successful requests for search, reverse and lookup.
"""
if not req_succeeded or resource is None or resp.status != 200\
- or resource.name not in ('reverse', 'search', 'lookup', 'details'):
+ or resource.name not in ('reverse', 'search', 'lookup', 'details'):
return
finish = dt.datetime.now(tz=dt.timezone.utc)
f'{resource.name} "{params}"\n')
-class APIShutdown:
- """ Middleware that closes any open database connections.
+class APIMiddleware:
+ """ Middleware managing the Nominatim database connection.
"""
- def __init__(self, api: NominatimAPIAsync) -> None:
- self.api = api
+ def __init__(self, project_dir: Path, environ: Optional[Mapping[str, str]]) -> None:
+ self.api = NominatimAPIAsync(project_dir, environ)
+ self.app: Optional[App] = None
+
+ @property
+ def config(self) -> Configuration:
+ """ Get the configuration for Nominatim.
+ """
+ return self.api.config
+
+ def set_app(self, app: App) -> None:
+ """ Set the Falcon application this middleware is connected to.
+ """
+ self.app = app
+
+ async def process_startup(self, *_: Any) -> None:
+ """ Process the ASGI lifespan startup event.
+ """
+ assert self.app is not None
+ legacy_urls = self.api.config.get_bool('SERVE_LEGACY_URLS')
+ formatter = load_format_dispatcher('v1', self.api.config.project_dir)
+ for name, func in await api_impl.get_routes(self.api):
+ endpoint = EndpointWrapper(name, func, self.api, formatter)
+ self.app.add_route(f"/{name}", endpoint)
+ if legacy_urls:
+ self.app.add_route(f"/{name}.php", endpoint)
async def process_shutdown(self, *_: Any) -> None:
"""Process the ASGI lifespan shutdown event.
environ: Optional[Mapping[str, str]] = None) -> App:
""" Create a Nominatim Falcon ASGI application.
"""
- api = NominatimAPIAsync(project_dir, environ)
+ apimw = APIMiddleware(project_dir, environ)
- middleware: List[object] = [APIShutdown(api)]
- log_file = api.config.LOG_FILE
+ middleware: List[object] = [apimw]
+ log_file = apimw.config.LOG_FILE
if log_file:
middleware.append(FileLoggingMiddleware(log_file))
- app = App(cors_enable=api.config.get_bool('CORS_NOACCESSCONTROL'),
+ app = App(cors_enable=apimw.config.get_bool('CORS_NOACCESSCONTROL'),
middleware=middleware)
+
+ apimw.set_app(app)
app.add_error_handler(HTTPNominatimError, nominatim_error_handler)
app.add_error_handler(TimeoutError, timeout_error_handler)
# different from TimeoutError in Python <= 3.10
- app.add_error_handler(asyncio.TimeoutError, timeout_error_handler) # type: ignore[arg-type]
-
- legacy_urls = api.config.get_bool('SERVE_LEGACY_URLS')
- formatter = load_format_dispatcher('v1', project_dir)
- for name, func in api_impl.ROUTES:
- endpoint = EndpointWrapper(name, func, api, formatter)
- app.add_route(f"/{name}", endpoint)
- if legacy_urls:
- app.add_route(f"/{name}.php", endpoint)
+ app.add_error_handler(asyncio.TimeoutError, timeout_error_handler) # type: ignore[arg-type]
return app