munin/nominatim_requests
DESTINATION ${NOMINATIM_MUNINDIR})
endif()
+
+message(WARNING "Building with CMake is deprecated and will be removed in Nominatim 5.0."
+ "Use Nominatim pip packages instead.\n"
+ "See https://nominatim.org/release-docs/develop/admin/Installation/#downloading-and-building-nominatim")
config.vm.define "ubuntu22", primary: true do |sub|
sub.vm.box = "generic/ubuntu2204"
- if RUBY_PLATFORM.include?('darwin') && RUBY_PLATFORM.include?('arm64')
- # Apple M processor
- sub.vm.box = 'luminositylabsllc/ubuntu-22.04-arm64'
- end
sub.vm.provision :shell do |s|
s.path = "vagrant/Install-on-Ubuntu-22.sh"
s.privileged = false
config.vm.define "ubuntu24" do |sub|
sub.vm.box = "bento/ubuntu-24.04"
+ if RUBY_PLATFORM.include?('darwin') && RUBY_PLATFORM.include?('arm64')
+ # Apple M processor
+ sub.vm.box = 'gutehall/ubuntu24-04'
+ end
sub.vm.provision :shell do |s|
s.path = "vagrant/Install-on-Ubuntu-24.sh"
s.privileged = false
# Deploying Nominatim using the PHP frontend
+!!! danger
+ The PHP frontend is deprecated and will be removed in Nominatim 5.0.
+
The Nominatim API is implemented as a PHP application. The `website/` directory
in the project directory contains the configured website. You can serve this
in a production environment with any web server that is capable to run
#### Testing the PHP frontend
+!!! danger
+ The PHP fronted is deprecated and will be removed in Nominatim 5.0.
+
You can run a small test server with the PHP frontend like this:
```sh
* [starlette](https://www.starlette.io/)
* [uvicorn](https://www.uvicorn.org/)
-For running the legacy PHP frontend:
+For running the legacy PHP frontend (deprecated, will be removed in Nominatim 5.0):
* [PHP](https://php.net) (7.3+)
* PHP-pgsql
cmake: `cmake -DBUILD_MODULE=on ../Nominatim`. To compile the module
you need to have the server development headers for PostgreSQL installed.
On Ubuntu/Debian run: `sudo apt install postgresql-server-dev-<postgresql version>`
+ The legacy tokenizer is deprecated and will be removed in Nominatim 5.0
Nominatim installs itself into `/usr/local` per default. To choose a different
# Database Migrations
-Since version 3.7.0 Nominatim offers automatic migrations. Please follow
+Nominatim offers automatic migrations since version 3.7. Please follow
the following steps:
-* stop any updates that are potentially running
-* update Nominatim to the newer version
-* go to your project directory and run `nominatim admin --migrate`
-* (optionally) restart updates
+* Stop any updates that are potentially running
+* Update the backend: `pip install -U nominatim-db`
+* Go to your project directory and run `nominatim admin --migrate`
+* Update the frontend: `pip install -U nominatim-api`
+* (optionally) Restart updates
+
+If you are still using CMake for the installation of Nominatim, then you
+need to update the software in one step before migrating the database.
+It is not recommended to do this while the machine is serving requests.
Below you find additional migrations and hints about other structural and
breaking changes. **Please read them before running the migration.**
can be set in your local `.env` configuration
* [Import styles](Import-Styles.md) explains how to write your own import style
in order to control what kind of OSM data will be imported
+* [API Result Formatting](Result-Formatting.md) shows how to change the
+ output of the Nominatim API
* [Place ranking](Ranking.md) describes the configuration around classifing
places in terms of their importance and their role in an address
* [Tokenizers](Tokenizers.md) describes the configuration of the module
--- /dev/null
+# Changing the Appearance of Results in the Server API
+
+The Nominatim Server API offers a number of formatting options that
+present search results in [different output formats](../api/Output.md).
+These results only contain a subset of all the information that Nominatim
+has about the result. This page explains how to adapt the result output
+or add additional result formatting.
+
+## Defining custom result formatting
+
+To change the result output, you need to place a file `api/v1/format.py`
+into your project directory. This file needs to define a single variable
+`dispatch` containing a [FormatDispatcher](#formatdispatcher). This class
+serves to collect the functions for formatting the different result types
+and offers helper functions to apply the formatters.
+
+There are two ways to define the `dispatch` variable. If you want to reuse
+the default output formatting and just make some changes or add an additional
+format type, then import the dispatch object from the default API:
+
+``` python
+from nominatim_api.v1.format import dispatch as dispatch
+```
+
+If you prefer to define a completely new result output, then you can
+create an empty dispatcher object:
+
+``` python
+from nominatim_api import FormatDispatcher
+
+dispatch = FormatDispatcher()
+```
+
+## The formatting function
+
+The dispatcher organises the formatting functions by format and result type.
+The format corresponds to the `format` parameter of the API. It can contain
+one of the predefined format names or you can invent your own new format.
+
+API calls return data classes or an array of a data class which represent
+the result. You need to make sure there are formatters defined for the
+following result types:
+
+* StatusResult (single object, returned by `/status`)
+* DetailedResult (single object, returned by `/details`)
+* SearchResults (list of objects, returned by `/search`)
+* ReverseResults (list of objects, returned by `/reverse` and `/lookup`)
+* RawDataList (simple object, returned by `/deletable` and `/polygons`)
+
+A formatter function has the following signature:
+
+``` python
+def format_func(result: ResultType, options: Mapping[str, Any]) -> str
+```
+
+The options dictionary contains additional information about the original
+query. See the [reference below](#options-for-different-result-types)
+about the possible options.
+
+To set the result formatter for a certain result type and format, you need
+to write the format function and decorate it with the
+[`format_func`](#nominatim_api.FormatDispatcher.format_func)
+decorator.
+
+For example, let us extend the result for the status call in text format
+and add the server URL. Such a formatter would look like this:
+
+``` python
+from nominatim_api import StatusResult
+
+@dispatch.format_func(StatusResult, 'text')
+def _format_status_text(result, _):
+ header = 'Status for server nominatim.openstreetmap.org'
+ if result.status:
+ return f"{header}\n\nERROR: {result.message}"
+
+ return f"{header}\n\nOK"
+```
+
+If your dispatcher is derived from the default one, then this definition
+will overwrite the original formatter function. This way it is possible
+to customize the output of selected results.
+
+## Adding new formats
+
+You may also define a completely different output format. This is as simple
+as adding formatting functions for all result types using the custom
+format name:
+
+``` python
+from nominatim_api import StatusResult
+
+@dispatch.format_func(StatusResult, 'chatty')
+def _format_status_text(result, _):
+ if result.status:
+ return f"The server is currently not running. {result.message}"
+
+ return "Good news! The server is running just fine."
+```
+
+That's all. Nominatim will automatically pick up the new format name and
+will allow the user to use it. There is no need to implement formatter
+functions for all the result types, when you invent a new one. The
+available formats will be determined for each API endpoint separately.
+To find out which formats are available, you can use the `--list-formats`
+option of the CLI tool:
+
+```
+me@machine:planet-project$ nominatim status --list-formats
+2024-08-16 19:54:00: Using project directory: /home/nominatim/planet-project
+text
+json
+chatty
+debug
+me@machine:planet-project$
+```
+
+The `debug` format listed in the last line will always appear. It is a
+special format that enables debug output via the command line (the same
+as the `debug=1` parameter enables for the server API). To not clash
+with this built-in function, you shouldn't name your own format 'debug'.
+
+### Content type of new formats
+
+All responses will be returned with the content type application/json by
+default. If your format produces a different content type, you need
+to configure the content type with the `set_content_type()` function.
+
+For example, the 'chatty' format above returns just simple text. So the
+content type should be set up as:
+
+``` python
+from nominatim_api.server.content_types import CONTENT_TEXT
+
+dispatch.set_content_type('chatty', CONTENT_TEXT)
+```
+
+The `content_types` module used above provides constants for the most
+frequent content types. You set the content type to an arbitrary string,
+if the content type you need is not available.
+
+## Formatting error messages
+
+Any exception thrown during processing of a request is given to
+a special error formatting function. It takes the requested content type,
+the status code and the error message. It should return the error message
+in a form appropriate for the given content type.
+
+You can overwrite the default formatting function with the decorator
+`error_format_func`:
+
+``` python
+import nominatim_api.server.content_types as ct
+
+@dispatch.error_format_func
+def _format_error(content_type: str, msg: str, status: int) -> str:
+ if content_type == ct.CONTENT_XML:
+ return f"""<?xml version="1.0" encoding="UTF-8" ?>
+ <message>{msg}</message>
+ """
+ if content_type == ct.CONTENT_JSON:
+ return f'"{msg}"'
+
+ return f"ERROR: {msg}"
+```
+
+
+## Debugging custom formatters
+
+The easiest way to try out your custom formatter is by using the Nominatim
+CLI commands. Custom formats can be chosen with the `--format` parameter:
+
+```
+me@machine:planet-project$ nominatim status --format chatty
+2024-08-16 19:54:00: Using project directory: /home/nominatim/planet-project
+Good news! The server is running just fine.
+me@machine:planet-project$
+```
+
+They will also emit full error messages when there is a problem with the
+code you need to debug.
+
+!!! danger
+ In some cases, when you make an error with your import statement, the
+ CLI will not give you an error but instead tell you, that the API
+ commands are no longer available:
+
+ me@machine: nominatim status
+ usage: nominatim [-h] [--version] {import,freeze,replication,special-phrases,add-data,index,refresh,admin} ...
+ nominatim: error: argument subcommand: invalid choice: 'status'
+
+ This happens because the CLI tool is meant to still work when the
+ nominatim-api package is not installed. Import errors involving
+ `nominatim_api` are interpreted as "package not installed".
+
+ Use the help command to find out which is the offending import that
+ could not be found:
+
+ me@machine: nominatim -h
+ ... [other help text] ...
+ Nominatim API package not found (was looking for module: nominatim_api.xxx).
+
+## Reference
+
+### FormatDispatcher
+
+::: nominatim_api.FormatDispatcher
+ options:
+ heading_level: 6
+ group_by_category: False
+
+### JsonWriter
+
+::: nominatim_api.utils.json_writer.JsonWriter
+ options:
+ heading_level: 6
+ group_by_category: False
+
+### Options for different result types
+
+This section lists the options that may be handed in with the different result
+types in the v1 version of the Nominatim API.
+
+#### StatusResult
+
+_None._
+
+#### DetailedResult
+
+| Option | Description |
+|-----------------|-------------|
+| locales | [Locale](../library/Result-Handling.md#locale) object for the requested language(s) |
+| group_hierarchy | Setting of [group_hierarchy](../api/Details.md#output-details) parameter |
+| icon_base_url | (optional) URL pointing to icons as set in [NOMINATIM_MAPICON_URL](Settings.md#nominatim_mapicon_url) |
+
+#### SearchResults
+
+| Option | Description |
+|-----------------|-------------|
+| query | Original query string |
+| more_url | URL for requesting additional results for the same query |
+| exclude_place_ids | List of place IDs already returned |
+| viewbox | Setting of [viewbox](../api/Search.md#result-restriction) parameter |
+| extratags | Setting of [extratags](../api/Search.md#output-details) parameter |
+| namedetails | Setting of [namedetails](../api/Search.md#output-details) parameter |
+| addressdetails | Setting of [addressdetails](../api/Search.md#output-details) parameter |
+
+#### ReverseResults
+
+| Option | Description |
+|-----------------|-------------|
+| query | Original query string |
+| extratags | Setting of [extratags](../api/Search.md#output-details) parameter |
+| namedetails | Setting of [namedetails](../api/Search.md#output-details) parameter |
+| addressdetails | Setting of [addressdetails](../api/Search.md#output-details) parameter |
+
+#### RawDataList
+
+_None._
* SQLite (>= 3.30)
* Spatialite (> 5.0.0)
+* aiosqlite
On Ubuntu/Debian, you can run:
sudo apt install sqlite3 libsqlite3-mod-spatialite libspatialite7
+Install the aiosqlite Python package in your virtual environment:
+
+ /srv/nominatim-venv/bin/pip install aiosqlite
+
## Creating a new SQLite database
Nominatim cannot import directly into SQLite database. Instead you have to
## Legacy tokenizer
+!!! danger
+ The Legacy tokenizer is deprecated and will be removed in Nominatim 5.0.
+ If you still use a database with the legacy tokenizer, you must reimport
+ it using the ICU tokenizer below.
+
The legacy tokenizer implements the analysis algorithms of older Nominatim
versions. It uses a special Postgresql module to normalize names and queries.
This tokenizer is automatically installed and used when upgrading an older
The easiest way, to handle these Python dependencies is to run your
development from within a virtual environment.
+```sh
+sudo apt install libsqlite3-mod-spatialite php-cli
+```
+
To set up the virtual environment with all necessary packages run:
```sh
mkdocs mkdocstrings mkdocs-gen-files pytest pytest-asyncio pylint \
types-jinja2 types-markupsafe types-psutil types-psycopg2 \
types-pygments types-pyyaml types-requests types-ujson \
- types-urllib3 typing-extensions unicorn falcon
+ types-urllib3 typing-extensions unicorn falcon starlette \
+ uvicorn mypy osmium aiosqlite
```
Now enter the virtual environment whenever you want to develop:
-- formatted postcode and therefore 'postcode' contains a derived
-- variant.
CASE WHEN address ? 'postcode' THEN placex.postcode ELSE NULL::text END as postcode,
- substring(address->'housenumber','[0-9]+')::integer as hnr
+ (address->'housenumber')::integer as hnr
FROM placex, generate_series(1, array_upper(waynodes, 1)) nodeidpos
WHERE osm_type = 'N' and osm_id = waynodes[nodeidpos]::BIGINT
and address is not NULL and address ? 'housenumber'
+ and address->'housenumber' ~ '^[0-9]{1,6}$'
and ST_Distance(NEW.linegeo, geometry) < 0.0005
ORDER BY nodeidpos
LOOP
DECLARE
place_centre GEOMETRY;
nearcountry RECORD;
+ countries TEXT[];
BEGIN
place_centre := ST_PointOnSurface(place);
-- RAISE WARNING 'get_country_code, start: %', ST_AsText(place_centre);
-- Try for a OSM polygon
- FOR nearcountry IN
- SELECT country_code from location_area_country
- WHERE country_code is not null and st_covers(geometry, place_centre) limit 1
- LOOP
- RETURN nearcountry.country_code;
- END LOOP;
+ SELECT array_agg(country_code) FROM location_area_country
+ WHERE country_code is not null and st_covers(geometry, place_centre)
+ INTO countries;
+
+ IF array_length(countries, 1) = 1 THEN
+ RETURN countries[1];
+ END IF;
+
+ IF array_length(countries, 1) > 1 THEN
+ -- more than one country found, confirm against the fallback data what to choose
+ FOR nearcountry IN
+ SELECT country_code FROM country_osm_grid
+ WHERE ST_Covers(geometry, place_centre) AND country_code = ANY(countries)
+ ORDER BY area ASC
+ LOOP
+ RETURN nearcountry.country_code;
+ END LOOP;
+ -- Still nothing? Choose the country code with the smallest partition number.
+ -- And failing that, just go by the alphabet.
+ FOR nearcountry IN
+ SELECT cc,
+ (SELECT partition FROM country_name WHERE country_code = cc) as partition
+ FROM unnest(countries) cc
+ ORDER BY partition, cc
+ LOOP
+ RETURN nearcountry.cc;
+ END LOOP;
+
+ -- Should never be reached.
+ RETURN countries[1];
+ END IF;
-- RAISE WARNING 'osm fallback: %', ST_AsText(place_centre);
- 'Overview': 'customize/Overview.md'
- 'Import Styles': 'customize/Import-Styles.md'
- 'Configuration Settings': 'customize/Settings.md'
+ - 'API Result Formatting': 'customize/Result-Formatting.md'
- 'Per-Country Data': 'customize/Country-Settings.md'
- 'Place Ranking' : 'customize/Ranking.md'
- 'Importance' : 'customize/Importance.md'
SearchResult as SearchResult,
SearchResults as SearchResults)
from .localization import (Locales as Locales)
+from .result_formatting import (FormatDispatcher as FormatDispatcher,
+ load_format_dispatcher as load_format_dispatcher)
from .version import NOMINATIM_API_VERSION as __version__
This class shares most of the functions with its synchronous
version. There are some additional functions or parameters,
which are documented below.
+
+ This class should usually be used as a context manager in 'with' context.
"""
def __init__(self, project_dir: Path,
environ: Optional[Mapping[str, str]] = None,
await self._engine.dispose()
+ async def __aenter__(self) -> 'NominatimAPIAsync':
+ return self
+
+
+ async def __aexit__(self, *_: Any) -> None:
+ await self.close()
+
+
@contextlib.asynccontextmanager
async def begin(self) -> AsyncIterator[SearchConnection]:
""" Create a new connection with automatic transaction handling.
""" This class provides a thin synchronous wrapper around the asynchronous
Nominatim functions. It creates its own event loop and runs each
synchronous function call to completion using that loop.
+
+ This class should usually be used as a context manager in 'with' context.
"""
def __init__(self, project_dir: Path,
This function also closes the asynchronous worker loop making
the NominatimAPI object unusable.
"""
- self._loop.run_until_complete(self._async_api.close())
- self._loop.close()
+ if not self._loop.is_closed():
+ self._loop.run_until_complete(self._async_api.close())
+ self._loop.close()
+
+
+ def __enter__(self) -> 'NominatimAPI':
+ return self
+
+
+ def __exit__(self, *_: Any) -> None:
+ self.close()
@property
"""
Helper classes and functions for formatting results into API responses.
"""
-from typing import Type, TypeVar, Dict, List, Callable, Any, Mapping
+from typing import Type, TypeVar, Dict, List, Callable, Any, Mapping, Optional, cast
from collections import defaultdict
+from pathlib import Path
+import importlib
+
+from .server.content_types import CONTENT_JSON
T = TypeVar('T') # pylint: disable=invalid-name
FormatFunc = Callable[[T, Mapping[str, Any]], str]
+ErrorFormatFunc = Callable[[str, str, int], str]
class FormatDispatcher:
- """ Helper class to conveniently create formatting functions in
- a module using decorators.
+ """ Container for formatting functions for results.
+ Functions can conveniently be added by using decorated functions.
"""
- def __init__(self) -> None:
+ def __init__(self, content_types: Optional[Mapping[str, str]] = None) -> None:
+ self.error_handler: ErrorFormatFunc = lambda ct, msg, status: f"ERROR {status}: {msg}"
+ self.content_types: Dict[str, str] = {}
+ if content_types:
+ self.content_types.update(content_types)
self.format_functions: Dict[Type[Any], Dict[str, FormatFunc[Any]]] = defaultdict(dict)
return decorator
+ def error_format_func(self, func: ErrorFormatFunc) -> ErrorFormatFunc:
+ """ Decorator for a function that formats error messges.
+ There is only one error formatter per dispatcher. Using
+ the decorator repeatedly will overwrite previous functions.
+ """
+ self.error_handler = func
+ return func
+
+
def list_formats(self, result_type: Type[Any]) -> List[str]:
""" Return a list of formats supported by this formatter.
"""
`list_formats()`.
"""
return self.format_functions[type(result)][fmt](result, options)
+
+
+ def format_error(self, content_type: str, msg: str, status: int) -> str:
+ """ Convert the given error message into a response string
+ taking the requested content_type into account.
+
+ Change the format using the error_format_func decorator.
+ """
+ return self.error_handler(content_type, msg, status)
+
+
+ def set_content_type(self, fmt: str, content_type: str) -> None:
+ """ Set the content type for the given format. This is the string
+ that will be returned in the Content-Type header of the HTML
+ response, when the given format is choosen.
+ """
+ self.content_types[fmt] = content_type
+
+
+ def get_content_type(self, fmt: str) -> str:
+ """ Return the content type for the given format.
+
+ If no explicit content type has been defined, then
+ JSON format is assumed.
+ """
+ return self.content_types.get(fmt, CONTENT_JSON)
+
+
+def load_format_dispatcher(api_name: str, project_dir: Optional[Path]) -> FormatDispatcher:
+ """ Load the dispatcher for the given API.
+
+ The function first tries to find a module api/<api_name>/format.py
+ in the project directory. This file must export a single variable
+ `dispatcher`.
+
+ If the function does not exist, the default formatter is loaded.
+ """
+ if project_dir is not None:
+ priv_module = project_dir / 'api' / api_name / 'format.py'
+ if priv_module.is_file():
+ spec = importlib.util.spec_from_file_location(f'api.{api_name},format',
+ str(priv_module))
+ if spec:
+ module = importlib.util.module_from_spec(spec)
+ # Do not add to global modules because there is no standard
+ # module name that Python can resolve.
+ assert spec.loader is not None
+ spec.loader.exec_module(module)
+
+ return cast(FormatDispatcher, module.dispatch)
+
+ return cast(FormatDispatcher,
+ importlib.import_module(f'nominatim_api.{api_name}.format').dispatch)
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2024 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Base abstraction for implementing based on different ASGI frameworks.
+"""
+from typing import Optional, Any, NoReturn, Callable
+import abc
+import math
+
+from ..config import Configuration
+from ..core import NominatimAPIAsync
+from ..result_formatting import FormatDispatcher
+from .content_types import CONTENT_TEXT
+
+class ASGIAdaptor(abc.ABC):
+ """ Adapter class for the different ASGI frameworks.
+ Wraps functionality over concrete requests and responses.
+ """
+ content_type: str = CONTENT_TEXT
+
+
+ @abc.abstractmethod
+ def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
+ """ Return an input parameter as a string. If the parameter was
+ not provided, return the 'default' value.
+ """
+
+ @abc.abstractmethod
+ def get_header(self, name: str, default: Optional[str] = None) -> Optional[str]:
+ """ Return a HTTP header parameter as a string. If the parameter was
+ not provided, return the 'default' value.
+ """
+
+
+ @abc.abstractmethod
+ def error(self, msg: str, status: int = 400) -> Exception:
+ """ Construct an appropriate exception from the given error message.
+ The exception must result in a HTTP error with the given status.
+ """
+
+
+ @abc.abstractmethod
+ def create_response(self, status: int, output: str, num_results: int) -> Any:
+ """ Create a response from the given parameters. The result will
+ be returned by the endpoint functions. The adaptor may also
+ return None when the response is created internally with some
+ different means.
+
+ The response must return the HTTP given status code 'status', set
+ the HTTP content-type headers to the string provided and the
+ body of the response to 'output'.
+ """
+
+
+ @abc.abstractmethod
+ def base_uri(self) -> str:
+ """ Return the URI of the original request.
+ """
+
+
+ @abc.abstractmethod
+ def config(self) -> Configuration:
+ """ Return the current configuration object.
+ """
+
+
+ @abc.abstractmethod
+ def formatting(self) -> FormatDispatcher:
+ """ Return the formatting object to use.
+ """
+
+
+ def get_int(self, name: str, default: Optional[int] = None) -> int:
+ """ Return an input parameter as an int. Raises an exception if
+ the parameter is given but not in an integer format.
+
+ If 'default' is given, then it will be returned when the parameter
+ is missing completely. When 'default' is None, an error will be
+ raised on a missing parameter.
+ """
+ value = self.get(name)
+
+ if value is None:
+ if default is not None:
+ return default
+
+ self.raise_error(f"Parameter '{name}' missing.")
+
+ try:
+ intval = int(value)
+ except ValueError:
+ self.raise_error(f"Parameter '{name}' must be a number.")
+
+ return intval
+
+
+ def get_float(self, name: str, default: Optional[float] = None) -> float:
+ """ Return an input parameter as a flaoting-point number. Raises an
+ exception if the parameter is given but not in an float format.
+
+ If 'default' is given, then it will be returned when the parameter
+ is missing completely. When 'default' is None, an error will be
+ raised on a missing parameter.
+ """
+ value = self.get(name)
+
+ if value is None:
+ if default is not None:
+ return default
+
+ self.raise_error(f"Parameter '{name}' missing.")
+
+ try:
+ fval = float(value)
+ except ValueError:
+ self.raise_error(f"Parameter '{name}' must be a number.")
+
+ if math.isnan(fval) or math.isinf(fval):
+ self.raise_error(f"Parameter '{name}' must be a number.")
+
+ return fval
+
+
+ def get_bool(self, name: str, default: Optional[bool] = None) -> bool:
+ """ Return an input parameter as bool. Only '0' is accepted as
+ an input for 'false' all other inputs will be interpreted as 'true'.
+
+ If 'default' is given, then it will be returned when the parameter
+ is missing completely. When 'default' is None, an error will be
+ raised on a missing parameter.
+ """
+ value = self.get(name)
+
+ if value is None:
+ if default is not None:
+ return default
+
+ self.raise_error(f"Parameter '{name}' missing.")
+
+ return value != '0'
+
+
+ def raise_error(self, msg: str, status: int = 400) -> NoReturn:
+ """ Raise an exception resulting in the given HTTP status and
+ message. The message will be formatted according to the
+ output format chosen by the request.
+ """
+ raise self.error(self.formatting().format_error(self.content_type, msg, status),
+ status)
+
+
+EndpointFunc = Callable[[NominatimAPIAsync, ASGIAdaptor], Any]
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2024 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Constants for various content types for server responses.
+"""
+
+CONTENT_TEXT = 'text/plain; charset=utf-8'
+CONTENT_XML = 'text/xml; charset=utf-8'
+CONTENT_HTML = 'text/html; charset=utf-8'
+CONTENT_JSON = 'application/json; charset=utf-8'
from ...config import Configuration
from ...core import NominatimAPIAsync
from ... import v1 as api_impl
+from ...result_formatting import FormatDispatcher, load_format_dispatcher
from ... import logging as loglib
+from ..asgi_adaptor import ASGIAdaptor, EndpointFunc
class HTTPNominatimError(Exception):
""" A special exception class for errors raised during processing.
resp.content_type = 'text/plain; charset=utf-8'
-class ParamWrapper(api_impl.ASGIAdaptor):
+class ParamWrapper(ASGIAdaptor):
""" Adaptor class for server glue to Falcon framework.
"""
def __init__(self, req: Request, resp: Response,
- config: Configuration) -> None:
+ config: Configuration, formatter: FormatDispatcher) -> None:
self.request = req
self.response = resp
self._config = config
+ self._formatter = formatter
def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
def config(self) -> Configuration:
return self._config
+ def formatting(self) -> FormatDispatcher:
+ return self._formatter
+
class EndpointWrapper:
""" Converter for server glue endpoint functions to Falcon request handlers.
"""
- def __init__(self, name: str, func: api_impl.EndpointFunc, api: NominatimAPIAsync) -> None:
+ def __init__(self, name: str, func: EndpointFunc, api: NominatimAPIAsync,
+ formatter: FormatDispatcher) -> None:
self.name = name
self.func = func
self.api = api
+ self.formatter = formatter
async def on_get(self, req: Request, resp: Response) -> None:
""" Implementation of the endpoint.
"""
- await self.func(self.api, ParamWrapper(req, resp, self.api.config))
+ await self.func(self.api, ParamWrapper(req, resp, self.api.config,
+ self.formatter))
class FileLoggingMiddleware:
app.add_error_handler(asyncio.TimeoutError, timeout_error_handler)
legacy_urls = api.config.get_bool('SERVE_LEGACY_URLS')
+ formatter = load_format_dispatcher('v1', project_dir)
for name, func in api_impl.ROUTES:
- endpoint = EndpointWrapper(name, func, api)
+ endpoint = EndpointWrapper(name, func, api, formatter)
app.add_route(f"/{name}", endpoint)
if legacy_urls:
app.add_route(f"/{name}.php", endpoint)
from ...config import Configuration
from ...core import NominatimAPIAsync
from ... import v1 as api_impl
+from ...result_formatting import FormatDispatcher, load_format_dispatcher
+from ..asgi_adaptor import ASGIAdaptor, EndpointFunc
from ... import logging as loglib
-class ParamWrapper(api_impl.ASGIAdaptor):
+class ParamWrapper(ASGIAdaptor):
""" Adaptor class for server glue to Starlette framework.
"""
return cast(Configuration, self.request.app.state.API.config)
-def _wrap_endpoint(func: api_impl.EndpointFunc)\
+ def formatting(self) -> FormatDispatcher:
+ return cast(FormatDispatcher, self.request.app.state.API.formatter)
+
+
+def _wrap_endpoint(func: EndpointFunc)\
-> Callable[[Request], Coroutine[Any, Any, Response]]:
async def _callback(request: Request) -> Response:
return cast(Response, await func(request.app.state.API, ParamWrapper(request)))
on_shutdown=[_shutdown])
app.state.API = NominatimAPIAsync(project_dir, environ)
+ app.state.formatter = load_format_dispatcher('v1', project_dir)
return app
#pylint: disable=useless-import-alias
-from .server_glue import (ASGIAdaptor as ASGIAdaptor,
- EndpointFunc as EndpointFunc,
- ROUTES as ROUTES)
-
-from . import format as _format
-
-list_formats = _format.dispatch.list_formats
-supports_format = _format.dispatch.supports_format
-format_result = _format.dispatch.format_result
+from .server_glue import ROUTES as ROUTES
from ..result_formatting import FormatDispatcher
from .classtypes import ICONS
from . import format_json, format_xml
+from .. import logging as loglib
+from ..server import content_types as ct
class RawDataList(List[Dict[str, Any]]):
""" Data type for formatting raw data lists 'as is' in json.
"""
-dispatch = FormatDispatcher()
+dispatch = FormatDispatcher({'text': ct.CONTENT_TEXT,
+ 'xml': ct.CONTENT_XML,
+ 'debug': ct.CONTENT_HTML})
+
+@dispatch.error_format_func
+def _format_error(content_type: str, msg: str, status: int) -> str:
+ if content_type == ct.CONTENT_XML:
+ return f"""<?xml version="1.0" encoding="UTF-8" ?>
+ <error>
+ <code>{status}</code>
+ <message>{msg}</message>
+ </error>
+ """
+
+ if content_type == ct.CONTENT_JSON:
+ return f"""{{"error":{{"code":{status},"message":"{msg}"}}}}"""
+
+ if content_type == ct.CONTENT_HTML:
+ loglib.log().section('Execution error')
+ loglib.log().var_dump('Status', status)
+ loglib.log().var_dump('Message', msg)
+ return loglib.get_and_disable()
+
+ return f"ERROR {status}: {msg}"
+
@dispatch.format_func(StatusResult, 'text')
def _format_status_text(result: StatusResult, _: Mapping[str, Any]) -> str:
Generic part of the server implementation of the v1 API.
Combine with the scaffolding provided for the various Python ASGI frameworks.
"""
-from typing import Optional, Any, Type, Callable, NoReturn, Dict, cast
+from typing import Optional, Any, Type, Dict, cast
from functools import reduce
-import abc
import dataclasses
-import math
from urllib.parse import urlencode
import sqlalchemy as sa
from ..errors import UsageError
-from ..config import Configuration
from .. import logging as loglib
from ..core import NominatimAPIAsync
-from .format import dispatch as formatting
from .format import RawDataList
from ..types import DataLayer, GeometryFormat, PlaceRef, PlaceID, OsmID, Point
from ..status import StatusResult
from ..results import DetailedResult, ReverseResults, SearchResult, SearchResults
from ..localization import Locales
from . import helpers
+from ..server import content_types as ct
+from ..server.asgi_adaptor import ASGIAdaptor
-CONTENT_TEXT = 'text/plain; charset=utf-8'
-CONTENT_XML = 'text/xml; charset=utf-8'
-CONTENT_HTML = 'text/html; charset=utf-8'
-CONTENT_JSON = 'application/json; charset=utf-8'
-
-CONTENT_TYPE = {'text': CONTENT_TEXT, 'xml': CONTENT_XML, 'debug': CONTENT_HTML}
-
-class ASGIAdaptor(abc.ABC):
- """ Adapter class for the different ASGI frameworks.
- Wraps functionality over concrete requests and responses.
+def build_response(adaptor: ASGIAdaptor, output: str, status: int = 200,
+ num_results: int = 0) -> Any:
+ """ Create a response from the given output. Wraps a JSONP function
+ around the response, if necessary.
"""
- content_type: str = CONTENT_TEXT
-
- @abc.abstractmethod
- def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
- """ Return an input parameter as a string. If the parameter was
- not provided, return the 'default' value.
- """
-
- @abc.abstractmethod
- def get_header(self, name: str, default: Optional[str] = None) -> Optional[str]:
- """ Return a HTTP header parameter as a string. If the parameter was
- not provided, return the 'default' value.
- """
-
-
- @abc.abstractmethod
- def error(self, msg: str, status: int = 400) -> Exception:
- """ Construct an appropriate exception from the given error message.
- The exception must result in a HTTP error with the given status.
- """
-
-
- @abc.abstractmethod
- def create_response(self, status: int, output: str, num_results: int) -> Any:
- """ Create a response from the given parameters. The result will
- be returned by the endpoint functions. The adaptor may also
- return None when the response is created internally with some
- different means.
-
- The response must return the HTTP given status code 'status', set
- the HTTP content-type headers to the string provided and the
- body of the response to 'output'.
- """
-
- @abc.abstractmethod
- def base_uri(self) -> str:
- """ Return the URI of the original request.
- """
-
-
- @abc.abstractmethod
- def config(self) -> Configuration:
- """ Return the current configuration object.
- """
-
-
- def build_response(self, output: str, status: int = 200, num_results: int = 0) -> Any:
- """ Create a response from the given output. Wraps a JSONP function
- around the response, if necessary.
- """
- if self.content_type == CONTENT_JSON and status == 200:
- jsonp = self.get('json_callback')
- if jsonp is not None:
- if any(not part.isidentifier() for part in jsonp.split('.')):
- self.raise_error('Invalid json_callback value')
- output = f"{jsonp}({output})"
- self.content_type = 'application/javascript; charset=utf-8'
-
- return self.create_response(status, output, num_results)
-
-
- def raise_error(self, msg: str, status: int = 400) -> NoReturn:
- """ Raise an exception resulting in the given HTTP status and
- message. The message will be formatted according to the
- output format chosen by the request.
- """
- if self.content_type == CONTENT_XML:
- msg = f"""<?xml version="1.0" encoding="UTF-8" ?>
- <error>
- <code>{status}</code>
- <message>{msg}</message>
- </error>
- """
- elif self.content_type == CONTENT_JSON:
- msg = f"""{{"error":{{"code":{status},"message":"{msg}"}}}}"""
- elif self.content_type == CONTENT_HTML:
- loglib.log().section('Execution error')
- loglib.log().var_dump('Status', status)
- loglib.log().var_dump('Message', msg)
- msg = loglib.get_and_disable()
-
- raise self.error(msg, status)
-
-
- def get_int(self, name: str, default: Optional[int] = None) -> int:
- """ Return an input parameter as an int. Raises an exception if
- the parameter is given but not in an integer format.
-
- If 'default' is given, then it will be returned when the parameter
- is missing completely. When 'default' is None, an error will be
- raised on a missing parameter.
- """
- value = self.get(name)
-
- if value is None:
- if default is not None:
- return default
-
- self.raise_error(f"Parameter '{name}' missing.")
-
- try:
- intval = int(value)
- except ValueError:
- self.raise_error(f"Parameter '{name}' must be a number.")
-
- return intval
-
-
- def get_float(self, name: str, default: Optional[float] = None) -> float:
- """ Return an input parameter as a flaoting-point number. Raises an
- exception if the parameter is given but not in an float format.
-
- If 'default' is given, then it will be returned when the parameter
- is missing completely. When 'default' is None, an error will be
- raised on a missing parameter.
- """
- value = self.get(name)
-
- if value is None:
- if default is not None:
- return default
+ if adaptor.content_type == ct.CONTENT_JSON and status == 200:
+ jsonp = adaptor.get('json_callback')
+ if jsonp is not None:
+ if any(not part.isidentifier() for part in jsonp.split('.')):
+ adaptor.raise_error('Invalid json_callback value')
+ output = f"{jsonp}({output})"
+ adaptor.content_type = 'application/javascript; charset=utf-8'
- self.raise_error(f"Parameter '{name}' missing.")
+ return adaptor.create_response(status, output, num_results)
- try:
- fval = float(value)
- except ValueError:
- self.raise_error(f"Parameter '{name}' must be a number.")
- if math.isnan(fval) or math.isinf(fval):
- self.raise_error(f"Parameter '{name}' must be a number.")
-
- return fval
-
-
- def get_bool(self, name: str, default: Optional[bool] = None) -> bool:
- """ Return an input parameter as bool. Only '0' is accepted as
- an input for 'false' all other inputs will be interpreted as 'true'.
-
- If 'default' is given, then it will be returned when the parameter
- is missing completely. When 'default' is None, an error will be
- raised on a missing parameter.
- """
- value = self.get(name)
-
- if value is None:
- if default is not None:
- return default
-
- self.raise_error(f"Parameter '{name}' missing.")
+def get_accepted_languages(adaptor: ASGIAdaptor) -> str:
+ """ Return the accepted languages.
+ """
+ return adaptor.get('accept-language')\
+ or adaptor.get_header('accept-language')\
+ or adaptor.config().DEFAULT_LANGUAGE
- return value != '0'
+def setup_debugging(adaptor: ASGIAdaptor) -> bool:
+ """ Set up collection of debug information if requested.
- def get_accepted_languages(self) -> str:
- """ Return the accepted languages.
- """
- return self.get('accept-language')\
- or self.get_header('accept-language')\
- or self.config().DEFAULT_LANGUAGE
+ Return True when debugging was requested.
+ """
+ if adaptor.get_bool('debug', False):
+ loglib.set_log_output('html')
+ adaptor.content_type = ct.CONTENT_HTML
+ return True
+ return False
- def setup_debugging(self) -> bool:
- """ Set up collection of debug information if requested.
- Return True when debugging was requested.
- """
- if self.get_bool('debug', False):
- loglib.set_log_output('html')
- self.content_type = CONTENT_HTML
- return True
+def get_layers(adaptor: ASGIAdaptor) -> Optional[DataLayer]:
+ """ Return a parsed version of the layer parameter.
+ """
+ param = adaptor.get('layer', None)
+ if param is None:
+ return None
- return False
+ return cast(DataLayer,
+ reduce(DataLayer.__or__,
+ (getattr(DataLayer, s.upper()) for s in param.split(','))))
- def get_layers(self) -> Optional[DataLayer]:
- """ Return a parsed version of the layer parameter.
- """
- param = self.get('layer', None)
- if param is None:
- return None
+def parse_format(adaptor: ASGIAdaptor, result_type: Type[Any], default: str) -> str:
+ """ Get and check the 'format' parameter and prepare the formatter.
+ `result_type` is the type of result to be returned by the function
+ and `default` the format value to assume when no parameter is present.
+ """
+ fmt = adaptor.get('format', default=default)
+ assert fmt is not None
- return cast(DataLayer,
- reduce(DataLayer.__or__,
- (getattr(DataLayer, s.upper()) for s in param.split(','))))
+ formatting = adaptor.formatting()
+ if not formatting.supports_format(result_type, fmt):
+ adaptor.raise_error("Parameter 'format' must be one of: " +
+ ', '.join(formatting.list_formats(result_type)))
- def parse_format(self, result_type: Type[Any], default: str) -> str:
- """ Get and check the 'format' parameter and prepare the formatter.
- `result_type` is the type of result to be returned by the function
- and `default` the format value to assume when no parameter is present.
- """
- fmt = self.get('format', default=default)
- assert fmt is not None
+ adaptor.content_type = formatting.get_content_type(fmt)
+ return fmt
- if not formatting.supports_format(result_type, fmt):
- self.raise_error("Parameter 'format' must be one of: " +
- ', '.join(formatting.list_formats(result_type)))
- self.content_type = CONTENT_TYPE.get(fmt, CONTENT_JSON)
- return fmt
+def parse_geometry_details(adaptor: ASGIAdaptor, fmt: str) -> Dict[str, Any]:
+ """ Create details structure from the supplied geometry parameters.
+ """
+ numgeoms = 0
+ output = GeometryFormat.NONE
+ if adaptor.get_bool('polygon_geojson', False):
+ output |= GeometryFormat.GEOJSON
+ numgeoms += 1
+ if fmt not in ('geojson', 'geocodejson'):
+ if adaptor.get_bool('polygon_text', False):
+ output |= GeometryFormat.TEXT
+ numgeoms += 1
+ if adaptor.get_bool('polygon_kml', False):
+ output |= GeometryFormat.KML
+ numgeoms += 1
+ if adaptor.get_bool('polygon_svg', False):
+ output |= GeometryFormat.SVG
+ numgeoms += 1
+ if numgeoms > adaptor.config().get_int('POLYGON_OUTPUT_MAX_TYPES'):
+ adaptor.raise_error('Too many polygon output options selected.')
- def parse_geometry_details(self, fmt: str) -> Dict[str, Any]:
- """ Create details structure from the supplied geometry parameters.
- """
- numgeoms = 0
- output = GeometryFormat.NONE
- if self.get_bool('polygon_geojson', False):
- output |= GeometryFormat.GEOJSON
- numgeoms += 1
- if fmt not in ('geojson', 'geocodejson'):
- if self.get_bool('polygon_text', False):
- output |= GeometryFormat.TEXT
- numgeoms += 1
- if self.get_bool('polygon_kml', False):
- output |= GeometryFormat.KML
- numgeoms += 1
- if self.get_bool('polygon_svg', False):
- output |= GeometryFormat.SVG
- numgeoms += 1
-
- if numgeoms > self.config().get_int('POLYGON_OUTPUT_MAX_TYPES'):
- self.raise_error('Too many polygon output options selected.')
-
- return {'address_details': True,
- 'geometry_simplification': self.get_float('polygon_threshold', 0.0),
- 'geometry_output': output
- }
+ return {'address_details': True,
+ 'geometry_simplification': adaptor.get_float('polygon_threshold', 0.0),
+ 'geometry_output': output
+ }
async def status_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
"""
result = await api.status()
- fmt = params.parse_format(StatusResult, 'text')
+ fmt = parse_format(params, StatusResult, 'text')
if fmt == 'text' and result.status:
status_code = 500
else:
status_code = 200
- return params.build_response(formatting.format_result(result, fmt, {}),
+ return build_response(params, params.formatting().format_result(result, fmt, {}),
status=status_code)
async def details_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /details endpoint. See API docs for details.
"""
- fmt = params.parse_format(DetailedResult, 'json')
+ fmt = parse_format(params, DetailedResult, 'json')
place_id = params.get_int('place_id', 0)
place: PlaceRef
if place_id:
params.raise_error("Missing ID parameter 'place_id' or 'osmtype'.")
place = OsmID(osmtype, params.get_int('osmid'), params.get('class'))
- debug = params.setup_debugging()
+ debug = setup_debugging(params)
- locales = Locales.from_accept_languages(params.get_accepted_languages())
+ locales = Locales.from_accept_languages(get_accepted_languages(params))
result = await api.details(place,
address_details=params.get_bool('addressdetails', False),
)
if debug:
- return params.build_response(loglib.get_and_disable())
+ return build_response(params, loglib.get_and_disable())
if result is None:
params.raise_error('No place with that OSM ID found.', status=404)
- output = formatting.format_result(result, fmt,
+ output = params.formatting().format_result(result, fmt,
{'locales': locales,
'group_hierarchy': params.get_bool('group_hierarchy', False),
'icon_base_url': params.config().MAPICON_URL})
- return params.build_response(output, num_results=1)
+ return build_response(params, output, num_results=1)
async def reverse_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /reverse endpoint. See API docs for details.
"""
- fmt = params.parse_format(ReverseResults, 'xml')
- debug = params.setup_debugging()
+ fmt = parse_format(params, ReverseResults, 'xml')
+ debug = setup_debugging(params)
coord = Point(params.get_float('lon'), params.get_float('lat'))
- details = params.parse_geometry_details(fmt)
+ details = parse_geometry_details(params, fmt)
details['max_rank'] = helpers.zoom_to_rank(params.get_int('zoom', 18))
- details['layers'] = params.get_layers()
- details['locales'] = Locales.from_accept_languages(params.get_accepted_languages())
+ details['layers'] = get_layers(params)
+ details['locales'] = Locales.from_accept_languages(get_accepted_languages(params))
result = await api.reverse(coord, **details)
if debug:
- return params.build_response(loglib.get_and_disable(), num_results=1 if result else 0)
+ return build_response(params, loglib.get_and_disable(), num_results=1 if result else 0)
if fmt == 'xml':
queryparts = {'lat': str(coord.lat), 'lon': str(coord.lon), 'format': 'xml'}
'namedetails': params.get_bool('namedetails', False),
'addressdetails': params.get_bool('addressdetails', True)}
- output = formatting.format_result(ReverseResults([result] if result else []),
- fmt, fmt_options)
+ output = params.formatting().format_result(ReverseResults([result] if result else []),
+ fmt, fmt_options)
- return params.build_response(output, num_results=1 if result else 0)
+ return build_response(params, output, num_results=1 if result else 0)
async def lookup_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /lookup endpoint. See API docs for details.
"""
- fmt = params.parse_format(SearchResults, 'xml')
- debug = params.setup_debugging()
- details = params.parse_geometry_details(fmt)
- details['locales'] = Locales.from_accept_languages(params.get_accepted_languages())
+ fmt = parse_format(params, SearchResults, 'xml')
+ debug = setup_debugging(params)
+ details = parse_geometry_details(params, fmt)
+ details['locales'] = Locales.from_accept_languages(get_accepted_languages(params))
places = []
for oid in (params.get('osm_ids') or '').split(','):
results = SearchResults()
if debug:
- return params.build_response(loglib.get_and_disable(), num_results=len(results))
+ return build_response(params, loglib.get_and_disable(), num_results=len(results))
fmt_options = {'extratags': params.get_bool('extratags', False),
'namedetails': params.get_bool('namedetails', False),
'addressdetails': params.get_bool('addressdetails', True)}
- output = formatting.format_result(results, fmt, fmt_options)
+ output = params.formatting().format_result(results, fmt, fmt_options)
- return params.build_response(output, num_results=len(results))
+ return build_response(params, output, num_results=len(results))
async def _unstructured_search(query: str, api: NominatimAPIAsync,
async def search_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /search endpoint. See API docs for details.
"""
- fmt = params.parse_format(SearchResults, 'jsonv2')
- debug = params.setup_debugging()
- details = params.parse_geometry_details(fmt)
+ fmt = parse_format(params, SearchResults, 'jsonv2')
+ debug = setup_debugging(params)
+ details = parse_geometry_details(params, fmt)
details['countries'] = params.get('countrycodes', None)
details['excluded'] = params.get('exclude_place_ids', None)
if params.get('featureType', None) is not None:
details['layers'] = DataLayer.ADDRESS
else:
- details['layers'] = params.get_layers()
+ details['layers'] = get_layers(params)
- details['locales'] = Locales.from_accept_languages(params.get_accepted_languages())
+ details['locales'] = Locales.from_accept_languages(get_accepted_languages(params))
# unstructured query parameters
query = params.get('q', None)
results = helpers.deduplicate_results(results, max_results)
if debug:
- return params.build_response(loglib.get_and_disable(), num_results=len(results))
+ return build_response(params, loglib.get_and_disable(), num_results=len(results))
if fmt == 'xml':
helpers.extend_query_parts(queryparts, details,
'namedetails': params.get_bool('namedetails', False),
'addressdetails': params.get_bool('addressdetails', False)}
- output = formatting.format_result(results, fmt, fmt_options)
+ output = params.formatting().format_result(results, fmt, fmt_options)
- return params.build_response(output, num_results=len(results))
+ return build_response(params, output, num_results=len(results))
async def deletable_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
deleted or are broken in the OSM data but are kept in the
Nominatim database to minimize disruption.
"""
- fmt = params.parse_format(RawDataList, 'json')
+ fmt = parse_format(params, RawDataList, 'json')
async with api.begin() as conn:
sql = sa.text(""" SELECT p.place_id, country_code,
""")
results = RawDataList(r._asdict() for r in await conn.execute(sql))
- return params.build_response(formatting.format_result(results, fmt, {}))
+ return build_response(params, params.formatting().format_result(results, fmt, {}))
async def polygons_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
their size but are kept in the Nominatim database with their
old area to minimize disruption.
"""
- fmt = params.parse_format(RawDataList, 'json')
+ fmt = parse_format(params, RawDataList, 'json')
sql_params: Dict[str, Any] = {
'days': params.get_int('days', -1),
'cls': params.get('class')
results = RawDataList(r._asdict() for r in await conn.execute(sql, sql_params))
- return params.build_response(formatting.format_result(results, fmt, {}))
-
+ return build_response(params, params.formatting().format_result(results, fmt, {}))
-EndpointFunc = Callable[[NominatimAPIAsync, ASGIAdaptor], Any]
ROUTES = [
('status', status_endpoint),
log.warning('Using project directory: %s', str(args.project_dir))
try:
- return args.command.run(args)
+ ret = args.command.run(args)
+
+ if args.config.TOKENIZER == 'legacy':
+ log.warning('WARNING: the "legacy" tokenizer is deprecated '
+ 'and will be removed in Nominatim 5.0.')
+
+ return ret
except UsageError as exception:
if log.isEnabledFor(logging.DEBUG):
raise # use Python's exception printing
if args.engine == 'php':
if args.config.lib_dir.php is None:
raise UsageError("PHP frontend not configured.")
+ LOG.warning('\n\nWARNING: the PHP frontend is deprecated '
+ 'and will be removed in Nominatim 5.0.\n\n')
run_php_server(args.server, args.project_dir / 'website')
else:
asyncio.run(self.run_uvicorn(args))
raise ex
parser.parser.epilog = \
- '\n\nNominatim API package not found. The following commands are not available:'\
+ f'\n\nNominatim API package not found (was looking for module: {ex.name}).'\
+ '\nThe following commands are not available:'\
'\n export, convert, serve, search, reverse, lookup, details, status'\
"\n\nRun 'pip install nominatim-api' to install the package."
"""
Subcommand definitions for API calls from the command line.
"""
-from typing import Dict, Any, Optional
+from typing import Dict, Any, Optional, Type, Mapping
import argparse
import logging
import json
from functools import reduce
import nominatim_api as napi
-import nominatim_api.v1 as api_output
from nominatim_api.v1.helpers import zoom_to_rank, deduplicate_results
-from nominatim_api.v1.format import dispatch as formatting
+from nominatim_api.server.content_types import CONTENT_JSON
import nominatim_api.logging as loglib
from ..errors import UsageError
from .args import NominatimArgs
('namedetails', 'Include a list of alternative names')
)
+def _add_list_format(parser: argparse.ArgumentParser) -> None:
+ group = parser.add_argument_group('Other options')
+ group.add_argument('--list-formats', action='store_true',
+ help='List supported output formats and exit.')
+
+
def _add_api_output_arguments(parser: argparse.ArgumentParser) -> None:
- group = parser.add_argument_group('Output arguments')
- group.add_argument('--format', default='jsonv2',
- choices=formatting.list_formats(napi.SearchResults) + ['debug'],
- help='Format of result')
+ group = parser.add_argument_group('Output formatting')
+ group.add_argument('--format', type=str, default='jsonv2',
+ help='Format of result (use --list-format to see supported formats)')
for name, desc in EXTRADATA_PARAMS:
group.add_argument('--' + name, action='store_true', help=desc)
(napi.DataLayer[s.upper()] for s in args.layers))
+def _list_formats(formatter: napi.FormatDispatcher, rtype: Type[Any]) -> int:
+ for fmt in formatter.list_formats(rtype):
+ print(fmt)
+ print('debug')
+
+ return 0
+
+
+def _print_output(formatter: napi.FormatDispatcher, result: Any,
+ fmt: str, options: Mapping[str, Any]) -> None:
+ output = formatter.format_result(result, fmt, options)
+ if formatter.get_content_type(fmt) == CONTENT_JSON:
+ # reformat the result, so it is pretty-printed
+ try:
+ json.dump(json.loads(output), sys.stdout, indent=4, ensure_ascii=False)
+ except json.decoder.JSONDecodeError as err:
+ # Catch the error here, so that data can be debugged,
+ # when people are developping custom result formatters.
+ LOG.fatal("Parsing json failed: %s\nUnformatted output:\n%s", err, output)
+ else:
+ sys.stdout.write(output)
+ sys.stdout.write('\n')
+
+
class APISearch:
"""\
Execute a search query.
help='Preferred area to find search results')
group.add_argument('--bounded', action='store_true',
help='Strictly restrict results to viewbox area')
-
- group = parser.add_argument_group('Other arguments')
group.add_argument('--no-dedupe', action='store_false', dest='dedupe',
help='Do not remove duplicates from the result list')
+ _add_list_format(parser)
def run(self, args: NominatimArgs) -> int:
+ formatter = napi.load_format_dispatcher('v1', args.project_dir)
+
+ if args.list_formats:
+ return _list_formats(formatter, napi.SearchResults)
+
if args.format == 'debug':
loglib.set_log_output('text')
-
- api = napi.NominatimAPI(args.project_dir)
-
- params: Dict[str, Any] = {'max_results': args.limit + min(args.limit, 10),
- 'address_details': True, # needed for display name
- 'geometry_output': _get_geometry_output(args),
- 'geometry_simplification': args.polygon_threshold,
- 'countries': args.countrycodes,
- 'excluded': args.exclude_place_ids,
- 'viewbox': args.viewbox,
- 'bounded_viewbox': args.bounded,
- 'locales': _get_locales(args, api.config.DEFAULT_LANGUAGE)
- }
-
- if args.query:
- results = api.search(args.query, **params)
- else:
- results = api.search_address(amenity=args.amenity,
- street=args.street,
- city=args.city,
- county=args.county,
- state=args.state,
- postalcode=args.postalcode,
- country=args.country,
- **params)
+ elif not formatter.supports_format(napi.SearchResults, args.format):
+ raise UsageError(f"Unsupported format '{args.format}'. "
+ 'Use --list-formats to see supported formats.')
+
+ try:
+ with napi.NominatimAPI(args.project_dir) as api:
+ params: Dict[str, Any] = {'max_results': args.limit + min(args.limit, 10),
+ 'address_details': True, # needed for display name
+ 'geometry_output': _get_geometry_output(args),
+ 'geometry_simplification': args.polygon_threshold,
+ 'countries': args.countrycodes,
+ 'excluded': args.exclude_place_ids,
+ 'viewbox': args.viewbox,
+ 'bounded_viewbox': args.bounded,
+ 'locales': _get_locales(args, api.config.DEFAULT_LANGUAGE)
+ }
+
+ if args.query:
+ results = api.search(args.query, **params)
+ else:
+ results = api.search_address(amenity=args.amenity,
+ street=args.street,
+ city=args.city,
+ county=args.county,
+ state=args.state,
+ postalcode=args.postalcode,
+ country=args.country,
+ **params)
+ except napi.UsageError as ex:
+ raise UsageError(ex) from ex
if args.dedupe and len(results) > 1:
results = deduplicate_results(results, args.limit)
print(loglib.get_and_disable())
return 0
- output = api_output.format_result(
- results,
- args.format,
- {'extratags': args.extratags,
- 'namedetails': args.namedetails,
- 'addressdetails': args.addressdetails})
- if args.format != 'xml':
- # reformat the result, so it is pretty-printed
- json.dump(json.loads(output), sys.stdout, indent=4, ensure_ascii=False)
- else:
- sys.stdout.write(output)
- sys.stdout.write('\n')
-
+ _print_output(formatter, results, args.format,
+ {'extratags': args.extratags,
+ 'namedetails': args.namedetails,
+ 'addressdetails': args.addressdetails})
return 0
def add_args(self, parser: argparse.ArgumentParser) -> None:
group = parser.add_argument_group('Query arguments')
- group.add_argument('--lat', type=float, required=True,
+ group.add_argument('--lat', type=float,
help='Latitude of coordinate to look up (in WGS84)')
- group.add_argument('--lon', type=float, required=True,
+ group.add_argument('--lon', type=float,
help='Longitude of coordinate to look up (in WGS84)')
group.add_argument('--zoom', type=int,
help='Level of detail required for the address')
help='OSM id to lookup in format <NRW><id> (may be repeated)')
_add_api_output_arguments(parser)
+ _add_list_format(parser)
def run(self, args: NominatimArgs) -> int:
- if args.format == 'debug':
- loglib.set_log_output('text')
+ formatter = napi.load_format_dispatcher('v1', args.project_dir)
- api = napi.NominatimAPI(args.project_dir)
+ if args.list_formats:
+ return _list_formats(formatter, napi.ReverseResults)
- result = api.reverse(napi.Point(args.lon, args.lat),
- max_rank=zoom_to_rank(args.zoom or 18),
- layers=_get_layers(args, napi.DataLayer.ADDRESS | napi.DataLayer.POI),
- address_details=True, # needed for display name
- geometry_output=_get_geometry_output(args),
- geometry_simplification=args.polygon_threshold,
- locales=_get_locales(args, api.config.DEFAULT_LANGUAGE))
+ if args.format == 'debug':
+ loglib.set_log_output('text')
+ elif not formatter.supports_format(napi.ReverseResults, args.format):
+ raise UsageError(f"Unsupported format '{args.format}'. "
+ 'Use --list-formats to see supported formats.')
+
+ if args.lat is None or args.lon is None:
+ raise UsageError("lat' and 'lon' parameters are required.")
+
+ layers = _get_layers(args, napi.DataLayer.ADDRESS | napi.DataLayer.POI)
+
+ try:
+ with napi.NominatimAPI(args.project_dir) as api:
+ result = api.reverse(napi.Point(args.lon, args.lat),
+ max_rank=zoom_to_rank(args.zoom or 18),
+ layers=layers,
+ address_details=True, # needed for display name
+ geometry_output=_get_geometry_output(args),
+ geometry_simplification=args.polygon_threshold,
+ locales=_get_locales(args, api.config.DEFAULT_LANGUAGE))
+ except napi.UsageError as ex:
+ raise UsageError(ex) from ex
if args.format == 'debug':
print(loglib.get_and_disable())
return 0
if result:
- output = api_output.format_result(
- napi.ReverseResults([result]),
- args.format,
- {'extratags': args.extratags,
- 'namedetails': args.namedetails,
- 'addressdetails': args.addressdetails})
- if args.format != 'xml':
- # reformat the result, so it is pretty-printed
- json.dump(json.loads(output), sys.stdout, indent=4, ensure_ascii=False)
- else:
- sys.stdout.write(output)
- sys.stdout.write('\n')
+ _print_output(formatter, napi.ReverseResults([result]), args.format,
+ {'extratags': args.extratags,
+ 'namedetails': args.namedetails,
+ 'addressdetails': args.addressdetails})
return 0
def add_args(self, parser: argparse.ArgumentParser) -> None:
group = parser.add_argument_group('Query arguments')
group.add_argument('--id', metavar='OSMID',
- action='append', required=True, dest='ids',
+ action='append', dest='ids',
help='OSM id to lookup in format <NRW><id> (may be repeated)')
_add_api_output_arguments(parser)
+ _add_list_format(parser)
def run(self, args: NominatimArgs) -> int:
+ formatter = napi.load_format_dispatcher('v1', args.project_dir)
+
+ if args.list_formats:
+ return _list_formats(formatter, napi.ReverseResults)
+
if args.format == 'debug':
loglib.set_log_output('text')
+ elif not formatter.supports_format(napi.ReverseResults, args.format):
+ raise UsageError(f"Unsupported format '{args.format}'. "
+ 'Use --list-formats to see supported formats.')
+
+ if args.ids is None:
+ raise UsageError("'id' parameter required.")
+
+ places = [napi.OsmID(o[0], int(o[1:])) for o in args.ids]
- api = napi.NominatimAPI(args.project_dir)
+ try:
+ with napi.NominatimAPI(args.project_dir) as api:
+ results = api.lookup(places,
+ address_details=True, # needed for display name
+ geometry_output=_get_geometry_output(args),
+ geometry_simplification=args.polygon_threshold or 0.0,
+ locales=_get_locales(args, api.config.DEFAULT_LANGUAGE))
+ except napi.UsageError as ex:
+ raise UsageError(ex) from ex
if args.format == 'debug':
print(loglib.get_and_disable())
return 0
- places = [napi.OsmID(o[0], int(o[1:])) for o in args.ids]
-
- results = api.lookup(places,
- address_details=True, # needed for display name
- geometry_output=_get_geometry_output(args),
- geometry_simplification=args.polygon_threshold or 0.0,
- locales=_get_locales(args, api.config.DEFAULT_LANGUAGE))
-
- output = api_output.format_result(
- results,
- args.format,
- {'extratags': args.extratags,
- 'namedetails': args.namedetails,
- 'addressdetails': args.addressdetails})
- if args.format != 'xml':
- # reformat the result, so it is pretty-printed
- json.dump(json.loads(output), sys.stdout, indent=4, ensure_ascii=False)
- else:
- sys.stdout.write(output)
- sys.stdout.write('\n')
-
+ _print_output(formatter, results, args.format,
+ {'extratags': args.extratags,
+ 'namedetails': args.namedetails,
+ 'addressdetails': args.addressdetails})
return 0
def add_args(self, parser: argparse.ArgumentParser) -> None:
group = parser.add_argument_group('Query arguments')
- objs = group.add_mutually_exclusive_group(required=True)
- objs.add_argument('--node', '-n', type=int,
- help="Look up the OSM node with the given ID.")
- objs.add_argument('--way', '-w', type=int,
- help="Look up the OSM way with the given ID.")
- objs.add_argument('--relation', '-r', type=int,
- help="Look up the OSM relation with the given ID.")
- objs.add_argument('--place_id', '-p', type=int,
- help='Database internal identifier of the OSM object to look up')
+ group.add_argument('--node', '-n', type=int,
+ help="Look up the OSM node with the given ID.")
+ group.add_argument('--way', '-w', type=int,
+ help="Look up the OSM way with the given ID.")
+ group.add_argument('--relation', '-r', type=int,
+ help="Look up the OSM relation with the given ID.")
+ group.add_argument('--place_id', '-p', type=int,
+ help='Database internal identifier of the OSM object to look up')
group.add_argument('--class', dest='object_class',
help=("Class type to disambiguated multiple entries "
"of the same object."))
group = parser.add_argument_group('Output arguments')
+ group.add_argument('--format', type=str, default='json',
+ help='Format of result (use --list-formats to see supported formats)')
group.add_argument('--addressdetails', action='store_true',
help='Include a breakdown of the address into elements')
group.add_argument('--keywords', action='store_true',
help='Include geometry of result')
group.add_argument('--lang', '--accept-language', metavar='LANGS',
help='Preferred language order for presenting search results')
+ _add_list_format(parser)
def run(self, args: NominatimArgs) -> int:
+ formatter = napi.load_format_dispatcher('v1', args.project_dir)
+
+ if args.list_formats:
+ return _list_formats(formatter, napi.DetailedResult)
+
+ if args.format == 'debug':
+ loglib.set_log_output('text')
+ elif not formatter.supports_format(napi.DetailedResult, args.format):
+ raise UsageError(f"Unsupported format '{args.format}'. "
+ 'Use --list-formats to see supported formats.')
+
place: napi.PlaceRef
if args.node:
place = napi.OsmID('N', args.node, args.object_class)
place = napi.OsmID('W', args.way, args.object_class)
elif args.relation:
place = napi.OsmID('R', args.relation, args.object_class)
- else:
- assert args.place_id is not None
+ elif args.place_id is not None:
place = napi.PlaceID(args.place_id)
+ else:
+ raise UsageError('One of the arguments --node/-n --way/-w '
+ '--relation/-r --place_id/-p is required/')
+
+ try:
+ with napi.NominatimAPI(args.project_dir) as api:
+ locales = _get_locales(args, api.config.DEFAULT_LANGUAGE)
+ result = api.details(place,
+ address_details=args.addressdetails,
+ linked_places=args.linkedplaces,
+ parented_places=args.hierarchy,
+ keywords=args.keywords,
+ geometry_output=napi.GeometryFormat.GEOJSON
+ if args.polygon_geojson
+ else napi.GeometryFormat.NONE,
+ locales=locales)
+ except napi.UsageError as ex:
+ raise UsageError(ex) from ex
- api = napi.NominatimAPI(args.project_dir)
-
- locales = _get_locales(args, api.config.DEFAULT_LANGUAGE)
- result = api.details(place,
- address_details=args.addressdetails,
- linked_places=args.linkedplaces,
- parented_places=args.hierarchy,
- keywords=args.keywords,
- geometry_output=napi.GeometryFormat.GEOJSON
- if args.polygon_geojson
- else napi.GeometryFormat.NONE,
- locales=locales)
-
+ if args.format == 'debug':
+ print(loglib.get_and_disable())
+ return 0
if result:
- output = api_output.format_result(
- result,
- 'json',
- {'locales': locales,
- 'group_hierarchy': args.group_hierarchy})
- # reformat the result, so it is pretty-printed
- json.dump(json.loads(output), sys.stdout, indent=4, ensure_ascii=False)
- sys.stdout.write('\n')
-
+ _print_output(formatter, result, args.format or 'json',
+ {'locales': locales,
+ 'group_hierarchy': args.group_hierarchy})
return 0
LOG.error("Object not found in database.")
"""
def add_args(self, parser: argparse.ArgumentParser) -> None:
- formats = api_output.list_formats(napi.StatusResult)
group = parser.add_argument_group('API parameters')
- group.add_argument('--format', default=formats[0], choices=formats,
- help='Format of result')
+ group.add_argument('--format', type=str, default='text',
+ help='Format of result (use --list-formats to see supported formats)')
+ _add_list_format(parser)
def run(self, args: NominatimArgs) -> int:
- status = napi.NominatimAPI(args.project_dir).status()
- print(api_output.format_result(status, args.format, {}))
+ formatter = napi.load_format_dispatcher('v1', args.project_dir)
+
+ if args.list_formats:
+ return _list_formats(formatter, napi.StatusResult)
+
+ if args.format == 'debug':
+ loglib.set_log_output('text')
+ elif not formatter.supports_format(napi.StatusResult, args.format):
+ raise UsageError(f"Unsupported format '{args.format}'. "
+ 'Use --list-formats to see supported formats.')
+
+ try:
+ with napi.NominatimAPI(args.project_dir) as api:
+ status = api.status()
+ except napi.UsageError as ex:
+ raise UsageError(ex) from ex
+
+ if args.format == 'debug':
+ print(loglib.get_and_disable())
+ return 0
+
+ _print_output(formatter, status, args.format, {})
+
return 0
# Arguments to all query functions
format: str
+ list_formats: bool
addressdetails: bool
extratags: bool
namedetails: bool
"""
Main work horse for indexing (computing addresses) the database.
"""
-from typing import cast, List, Any
+from typing import cast, List, Any, Optional
import logging
import time
LOG.warning("Starting indexing boundaries using %s threads",
self.num_threads)
+ minrank = max(minrank, 4)
+ maxrank = min(maxrank, 25)
+
+ # Precompute number of rows to process for all rows
+ with connect(self.dsn) as conn:
+ hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
+ if hstore_info is None:
+ raise RuntimeError('Hstore extension is requested but not installed.')
+ psycopg.types.hstore.register_hstore(hstore_info)
+
+ with conn.cursor() as cur:
+ cur = conn.execute(""" SELECT rank_search, count(*)
+ FROM placex
+ WHERE rank_search between %s and %s
+ AND class = 'boundary' and type = 'administrative'
+ AND indexed_status > 0
+ GROUP BY rank_search""",
+ (minrank, maxrank))
+ total_tuples = {row.rank_search: row.count for row in cur}
+
with self.tokenizer.name_analyzer() as analyzer:
- for rank in range(max(minrank, 4), min(maxrank, 26)):
- total += await self._index(runners.BoundaryRunner(rank, analyzer))
+ for rank in range(minrank, maxrank + 1):
+ total += await self._index(runners.BoundaryRunner(rank, analyzer),
+ total_tuples=total_tuples.get(rank, 0))
return total
LOG.warning("Starting indexing rank (%i to %i) using %i threads",
minrank, maxrank, self.num_threads)
+ # Precompute number of rows to process for all rows
+ with connect(self.dsn) as conn:
+ hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
+ if hstore_info is None:
+ raise RuntimeError('Hstore extension is requested but not installed.')
+ psycopg.types.hstore.register_hstore(hstore_info)
+
+ with conn.cursor() as cur:
+ cur = conn.execute(""" SELECT rank_address, count(*)
+ FROM placex
+ WHERE rank_address between %s and %s
+ AND indexed_status > 0
+ GROUP BY rank_address""",
+ (minrank, maxrank))
+ total_tuples = {row.rank_address: row.count for row in cur}
+
+
with self.tokenizer.name_analyzer() as analyzer:
for rank in range(max(1, minrank), maxrank + 1):
if rank >= 30:
batch = 5
else:
batch = 1
- total += await self._index(runners.RankRunner(rank, analyzer), batch)
+ total += await self._index(runners.RankRunner(rank, analyzer),
+ batch=batch, total_tuples=total_tuples.get(rank, 0))
if maxrank == 30:
total += await self._index(runners.RankRunner(0, analyzer))
- total += await self._index(runners.InterpolationRunner(analyzer), 20)
+ total += await self._index(runners.InterpolationRunner(analyzer), batch=20)
return total
"""
LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
- return await self._index(runners.PostcodeRunner(), 20)
+ return await self._index(runners.PostcodeRunner(), batch=20)
def update_status_table(self) -> None:
conn.commit()
- async def _index(self, runner: runners.Runner, batch: int = 1) -> int:
+ async def _index(self, runner: runners.Runner, batch: int = 1,
+ total_tuples: Optional[int] = None) -> int:
""" Index a single rank or table. `runner` describes the SQL to use
for indexing. `batch` describes the number of objects that
- should be processed with a single SQL statement
+ should be processed with a single SQL statement.
+
+ `total_tuples` may contain the total number of rows to process.
+ When not supplied, the value will be computed using the
+ approriate runner function.
"""
LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
- total_tuples = self._prepare_indexing(runner)
+ if total_tuples is None:
+ total_tuples = self._prepare_indexing(runner)
progress = ProgressLogger(runner.name(), total_tuples)
def create(dsn: str, data_dir: Path) -> 'LegacyTokenizer':
""" Create a new instance of the tokenizer provided by this module.
"""
+ LOG.warning('WARNING: the legacy tokenizer is deprecated '
+ 'and will be removed in Nominatim 5.0.')
return LegacyTokenizer(dsn, data_dir)
-def _install_module(config_module_path: str, src_dir: Path, module_dir: Path) -> str:
+def _install_module(config_module_path: str, src_dir: Optional[Path], module_dir: Path) -> str:
""" Copies the PostgreSQL normalisation module into the project
directory if necessary. For historical reasons the module is
saved in the '/module' subdirectory and not with the other tokenizer
LOG.info("Using custom path for database module at '%s'", config_module_path)
return config_module_path
+ # Otherwise a source dir must be given.
+ if src_dir is None:
+ raise UsageError("The legacy tokenizer cannot be used with the Nominatim pip module.")
+
# Compatibility mode for builddir installations.
if module_dir.exists() and src_dir.samefile(module_dir):
LOG.info('Running from build directory. Leaving database module as is.')
'idx_placex_rank_search',
'idx_placex_rank_address',
'idx_placex_parent_place_id',
+ 'idx_placex_geometry_reverse_lookupplacenode',
'idx_placex_geometry_reverse_lookuppolygon',
'idx_placex_geometry_placenode',
'idx_osmline_parent_place_id',
'idx_postcode_id',
'idx_postcode_postcode'
]
+
+ # These won't exist if --reverse-only import was used
if table_exists(conn, 'search_name'):
indexes.extend(('idx_search_name_nameaddress_vector',
'idx_search_name_name_vector',
if server_version_tuple(conn) >= (11, 0, 0):
indexes.extend(('idx_placex_housenumber',
'idx_osmline_parent_osm_id_with_hnr'))
+
+ # These won't exist if --no-updates import was used
if table_exists(conn, 'place'):
indexes.extend(('idx_location_area_country_place_id',
'idx_place_osm_unique',
**Hardware Configuration:**
- RAM: {friendly_memory_string(psutil.virtual_memory().total)}
- number of CPUs: {psutil.cpu_count(logical=False)}
- - bare metal/AWS/other cloud service (per systemd-detect-virt(1)): {run_command("systemd-detect-virt")}
+ - bare metal/AWS/other cloud service (per systemd-detect-virt(1)):
+ {run_command("systemd-detect-virt")}
- type and size of disks:
**`df -h` - df - report file system disk space usage: **
```
"""
Helper functions for executing external programs.
"""
-from typing import Any, Mapping
+from typing import Any, Mapping, List, Optional
import logging
import os
+import re
import subprocess
import shutil
from ..typing import StrPath
from ..db.connection import get_pg_env
+from ..errors import UsageError
+from ..version import OSM2PGSQL_REQUIRED_VERSION
LOG = logging.getLogger()
def run_osm2pgsql(options: Mapping[str, Any]) -> None:
""" Run osm2pgsql with the given options.
"""
+ _check_osm2pgsql_version(options['osm2pgsql'])
+
env = get_pg_env(options['dsn'])
cmd = [_find_osm2pgsql_cmd(options['osm2pgsql']),
+ '--append' if options['append'] else '--create',
'--slim',
'--log-progress', 'true',
'--number-processes', '1' if options['append'] else str(options['threads']),
env['LUA_PATH'] = ';'.join((str(options['osm2pgsql_style_path'] / '?.lua'),
os.environ.get('LUAPATH', ';')))
cmd.extend(('--output', 'flex'))
+
+ for flavour in ('data', 'index'):
+ if options['tablespaces'][f"main_{flavour}"]:
+ env[f"NOMINATIM_TABLESPACE_PLACE_{flavour.upper()}"] = \
+ options['tablespaces'][f"main_{flavour}"]
else:
cmd.extend(('--output', 'gazetteer', '--hstore', '--latlon'))
+ cmd.extend(_mk_tablespace_options('main', options))
- cmd.append('--append' if options['append'] else '--create')
if options['flatnode_file']:
cmd.extend(('--flat-nodes', options['flatnode_file']))
- for key, param in (('slim_data', '--tablespace-slim-data'),
- ('slim_index', '--tablespace-slim-index'),
- ('main_data', '--tablespace-main-data'),
- ('main_index', '--tablespace-main-index')):
- if options['tablespaces'][key]:
- cmd.extend((param, options['tablespaces'][key]))
-
- if options['tablespaces']['main_data']:
- env['NOMINATIM_TABLESPACE_PLACE_DATA'] = options['tablespaces']['main_data']
- if options['tablespaces']['main_index']:
- env['NOMINATIM_TABLESPACE_PLACE_INDEX'] = options['tablespaces']['main_index']
+ cmd.extend(_mk_tablespace_options('slim', options))
if options.get('disable_jit', False):
env['PGOPTIONS'] = '-c jit=off -c max_parallel_workers_per_gather=0'
env=env, check=True)
-def _find_osm2pgsql_cmd(cmdline: str) -> str:
+def _mk_tablespace_options(ttype: str, options: Mapping[str, Any]) -> List[str]:
+ cmds: List[str] = []
+ for flavour in ('data', 'index'):
+ if options['tablespaces'][f"{ttype}_{flavour}"]:
+ cmds.extend((f"--tablespace-{ttype}-{flavour}",
+ options['tablespaces'][f"{ttype}_{flavour}"]))
+
+ return cmds
+
+
+def _find_osm2pgsql_cmd(cmdline: Optional[str]) -> str:
if cmdline is not None:
return cmdline
in_path = shutil.which('osm2pgsql')
if in_path is None:
- raise RuntimeError('osm2pgsql executable not found. Please install osm2pgsql first.')
+ raise UsageError('osm2pgsql executable not found. Please install osm2pgsql first.')
return str(in_path)
+
+
+def _check_osm2pgsql_version(cmdline: Optional[str]) -> None:
+ cmd = [_find_osm2pgsql_cmd(cmdline), '--version']
+
+ result = subprocess.run(cmd, capture_output=True, check=True)
+
+ if not result.stderr:
+ raise UsageError("osm2pgsql does not print version information.")
+
+ verinfo = result.stderr.decode('UTF-8')
+
+ match = re.search(r'osm2pgsql version (\d+)\.(\d+)', verinfo)
+ if match is None:
+ raise UsageError(f"No version information found in output: {verinfo}")
+
+ if (int(match[1]), int(match[2])) < OSM2PGSQL_REQUIRED_VERSION:
+ raise UsageError(f"osm2pgsql is too old. Found version {match[1]}.{match[2]}. "
+ f"Need at least version {'.'.join(map(str, OSM2PGSQL_REQUIRED_VERSION))}.")
import types
import urllib.request as urlrequest
-import requests
-
from ..errors import UsageError
from ..db import status
from ..db.connection import Connection, connect, server_version_tuple
from osmium.replication.server import ReplicationServer
from osmium import WriteHandler
from osmium import version as pyo_version
-except ImportError as exc:
+ import requests
+except ModuleNotFoundError as exc:
logging.getLogger().critical("pyosmium not installed. Replication functions not available.\n"
- "To install pyosmium via pip: pip3 install osmium")
+ "To install pyosmium via pip: pip install osmium")
raise UsageError("replication tools not available") from exc
LOG = logging.getLogger()
POSTGRESQL_REQUIRED_VERSION = (9, 6)
POSTGIS_REQUIRED_VERSION = (2, 2)
+OSM2PGSQL_REQUIRED_VERSION = (1, 8)
# Cmake sets a variable @GIT_HASH@ by executing 'git --log'. It is not run
# on every execution of 'make'.
Then results contain
| osm | display_name |
| N1 | Wenig, Loudou |
- Scenario: OSM country relations outside expected boundaries are ignored
+
+ Scenario: OSM country relations outside expected boundaries are ignored for naming
Given the grid
| 1 | | 2 |
| 4 | | 3 |
Then results contain
| osm | display_name |
| N1 | Wenig, Germany |
+
Scenario: Pre-defined country names are used
Given the grid with origin CH
| 1 |
Then results contain
| osm | display_name |
| N1 | Ingb, Switzerland |
+
+ Scenario: For overlapping countries, pre-defined countries are tie-breakers
+ Given the grid with origin US
+ | 1 | | 2 | | 5 |
+ | | 9 | | 8 | |
+ | 4 | | 3 | | 6 |
+ Given the named places
+ | osm | class | type | admin | country | geometry |
+ | R1 | boundary | administrative | 2 | de | (1,5,6,4,1) |
+ | R2 | boundary | administrative | 2 | us | (1,2,3,4,1) |
+ And the named places
+ | osm | class | type | geometry |
+ | N1 | place | town | 9 |
+ | N2 | place | town | 8 |
+ When importing
+ Then placex contains
+ | object | country_code |
+ | N1 | us |
+ | N2 | de |
+
+ Scenario: For overlapping countries outside pre-define countries prefer smaller partition
+ Given the grid with origin US
+ | 1 | | 2 | | 5 |
+ | | 9 | | 8 | |
+ | 4 | | 3 | | 6 |
+ Given the named places
+ | osm | class | type | admin | country | geometry |
+ | R1 | boundary | administrative | 2 | ch | (1,5,6,4,1) |
+ | R2 | boundary | administrative | 2 | de | (1,2,3,4,1) |
+ And the named places
+ | osm | class | type | geometry |
+ | N1 | place | town | 9 |
+ | N2 | place | town | 8 |
+ When importing
+ Then placex contains
+ | object | country_code |
+ | N1 | de |
+ | N2 | ch |
"""
from pathlib import Path
import pytest
+import pytest_asyncio
import time
import datetime as dt
for api in testapis:
api.close()
+
+
+@pytest_asyncio.fixture
+async def api(temp_db):
+ async with napi.NominatimAPIAsync(Path('/invalid')) as api:
+ yield api
from collections import namedtuple
import nominatim_api.v1.server_glue as glue
+from nominatim_api.v1.format import dispatch as formatting
from nominatim_api.config import Configuration
class FakeError(BaseException):
return FakeResponse(status, output, self.content_type)
- def base_uri(self) -> str:
+ def base_uri(self):
return 'http://test'
def config(self):
return self._config
+ def formatting(self):
+ return formatting
+
+
table_factory('word',
definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB')
- api = NominatimAPIAsync(Path('/invalid'), {})
- async with api.begin() as conn:
- yield conn
- await api.close()
+ async with NominatimAPIAsync(Path('/invalid'), {}) as api:
+ async with api.begin() as conn:
+ yield conn
@pytest.mark.asyncio
temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION make_standard_name(name TEXT)
RETURNS TEXT AS $$ SELECT lower(name); $$ LANGUAGE SQL;""")
- api = NominatimAPIAsync(Path('/invalid'), {})
- async with api.begin() as conn:
- yield conn
- await api.close()
+ async with NominatimAPIAsync(Path('/invalid'), {}) as api:
+ async with api.begin() as conn:
+ yield conn
@pytest.mark.asyncio
import pytest
-from nominatim_api import NominatimAPIAsync
from nominatim_api.search.query_analyzer_factory import make_query_analyzer
from nominatim_api.search.icu_tokenizer import ICUQueryAnalyzer
@pytest.mark.asyncio
-async def test_import_icu_tokenizer(table_factory):
+async def test_import_icu_tokenizer(table_factory, api):
table_factory('nominatim_properties',
definition='property TEXT, value TEXT',
content=(('tokenizer', 'icu'),
('tokenizer_import_normalisation', ':: lower();'),
('tokenizer_import_transliteration', "'1' > '/1/'; 'ä' > 'ä '")))
- api = NominatimAPIAsync(Path('/invalid'), {})
async with api.begin() as conn:
ana = await make_query_analyzer(conn)
assert isinstance(ana, ICUQueryAnalyzer)
- await api.close()
@pytest.mark.asyncio
-async def test_import_missing_property(table_factory):
- api = NominatimAPIAsync(Path('/invalid'), {})
+async def test_import_missing_property(table_factory, api):
table_factory('nominatim_properties',
definition='property TEXT, value TEXT')
async with api.begin() as conn:
with pytest.raises(ValueError, match='Property.*not found'):
await make_query_analyzer(conn)
- await api.close()
@pytest.mark.asyncio
-async def test_import_missing_module(table_factory):
- api = NominatimAPIAsync(Path('/invalid'), {})
+async def test_import_missing_module(table_factory, api):
table_factory('nominatim_properties',
definition='property TEXT, value TEXT',
content=(('tokenizer', 'missing'),))
async with api.begin() as conn:
with pytest.raises(RuntimeError, match='Tokenizer not found'):
await make_query_analyzer(conn)
- await api.close()
-
"""
from pathlib import Path
import pytest
-import pytest_asyncio
import sqlalchemy as sa
-from nominatim_api import NominatimAPIAsync
-
-@pytest_asyncio.fixture
-async def apiobj(temp_db):
- """ Create an asynchronous SQLAlchemy engine for the test DB.
- """
- api = NominatimAPIAsync(Path('/invalid'), {})
- yield api
- await api.close()
-
@pytest.mark.asyncio
-async def test_run_scalar(apiobj, table_factory):
+async def test_run_scalar(api, table_factory):
table_factory('foo', definition='that TEXT', content=(('a', ),))
- async with apiobj.begin() as conn:
+ async with api.begin() as conn:
assert await conn.scalar(sa.text('SELECT * FROM foo')) == 'a'
@pytest.mark.asyncio
-async def test_run_execute(apiobj, table_factory):
+async def test_run_execute(api, table_factory):
table_factory('foo', definition='that TEXT', content=(('a', ),))
- async with apiobj.begin() as conn:
+ async with api.begin() as conn:
result = await conn.execute(sa.text('SELECT * FROM foo'))
assert result.fetchone()[0] == 'a'
@pytest.mark.asyncio
-async def test_get_property_existing_cached(apiobj, table_factory):
+async def test_get_property_existing_cached(api, table_factory):
table_factory('nominatim_properties',
definition='property TEXT, value TEXT',
content=(('dbv', '96723'), ))
- async with apiobj.begin() as conn:
+ async with api.begin() as conn:
assert await conn.get_property('dbv') == '96723'
await conn.execute(sa.text('TRUNCATE nominatim_properties'))
@pytest.mark.asyncio
-async def test_get_property_existing_uncached(apiobj, table_factory):
+async def test_get_property_existing_uncached(api, table_factory):
table_factory('nominatim_properties',
definition='property TEXT, value TEXT',
content=(('dbv', '96723'), ))
- async with apiobj.begin() as conn:
+ async with api.begin() as conn:
assert await conn.get_property('dbv') == '96723'
await conn.execute(sa.text("UPDATE nominatim_properties SET value = '1'"))
@pytest.mark.asyncio
@pytest.mark.parametrize('param', ['foo', 'DB:server_version'])
-async def test_get_property_missing(apiobj, table_factory, param):
+async def test_get_property_missing(api, table_factory, param):
table_factory('nominatim_properties',
definition='property TEXT, value TEXT')
- async with apiobj.begin() as conn:
+ async with api.begin() as conn:
with pytest.raises(ValueError):
await conn.get_property(param)
@pytest.mark.asyncio
-async def test_get_db_property_existing(apiobj):
- async with apiobj.begin() as conn:
+async def test_get_db_property_existing(api):
+ async with api.begin() as conn:
assert await conn.get_db_property('server_version') > 0
@pytest.mark.asyncio
-async def test_get_db_property_existing(apiobj):
- async with apiobj.begin() as conn:
+async def test_get_db_property_existing(api):
+ async with api.begin() as conn:
with pytest.raises(ValueError):
await conn.get_db_property('dfkgjd.rijg')
from pathlib import Path
import pytest
-import pytest_asyncio
from fake_adaptor import FakeAdaptor, FakeError, FakeResponse
import nominatim_api.v1.server_glue as glue
-import nominatim_api as napi
-
-@pytest_asyncio.fixture
-async def api():
- api = napi.NominatimAPIAsync(Path('/invalid'))
- yield api
- await api.close()
-
class TestDeletableEndPoint:
{'place_id': 3, 'country_code': 'cd', 'name': None,
'osm_id': 781, 'osm_type': 'R',
'class': 'landcover', 'type': 'grass'}]
-
from pathlib import Path
import pytest
-import pytest_asyncio
from fake_adaptor import FakeAdaptor, FakeError, FakeResponse
import nominatim_api.v1.server_glue as glue
-import nominatim_api as napi
-
-@pytest_asyncio.fixture
-async def api():
- api = napi.NominatimAPIAsync(Path('/invalid'))
- yield api
- await api.close()
-
class TestPolygonsEndPoint:
import pytest
-import nominatim_api.v1 as api_impl
+from nominatim_api.v1.format import dispatch as v1_format
import nominatim_api as napi
STATUS_FORMATS = {'text', 'json'}
# StatusResult
def test_status_format_list():
- assert set(api_impl.list_formats(napi.StatusResult)) == STATUS_FORMATS
+ assert set(v1_format.list_formats(napi.StatusResult)) == STATUS_FORMATS
@pytest.mark.parametrize('fmt', list(STATUS_FORMATS))
def test_status_supported(fmt):
- assert api_impl.supports_format(napi.StatusResult, fmt)
+ assert v1_format.supports_format(napi.StatusResult, fmt)
def test_status_unsupported():
- assert not api_impl.supports_format(napi.StatusResult, 'gagaga')
+ assert not v1_format.supports_format(napi.StatusResult, 'gagaga')
def test_status_format_text():
- assert api_impl.format_result(napi.StatusResult(0, 'message here'), 'text', {}) == 'OK'
+ assert v1_format.format_result(napi.StatusResult(0, 'message here'), 'text', {}) == 'OK'
def test_status_format_text():
- assert api_impl.format_result(napi.StatusResult(500, 'message here'), 'text', {}) == 'ERROR: message here'
+ assert v1_format.format_result(napi.StatusResult(500, 'message here'), 'text', {}) == 'ERROR: message here'
def test_status_format_json_minimal():
status = napi.StatusResult(700, 'Bad format.')
- result = api_impl.format_result(status, 'json', {})
+ result = v1_format.format_result(status, 'json', {})
assert result == \
f'{{"status":700,"message":"Bad format.","software_version":"{napi.__version__}"}}'
status.data_updated = dt.datetime(2010, 2, 7, 20, 20, 3, 0, tzinfo=dt.timezone.utc)
status.database_version = '5.6'
- result = api_impl.format_result(status, 'json', {})
+ result = v1_format.format_result(status, 'json', {})
assert result == \
f'{{"status":0,"message":"OK","data_updated":"2010-02-07T20:20:03+00:00","software_version":"{napi.__version__}","database_version":"5.6"}}'
('place', 'thing'),
napi.Point(1.0, 2.0))
- result = api_impl.format_result(search, 'json', {})
+ result = v1_format.format_result(search, 'json', {})
assert json.loads(result) == \
{'category': 'place',
)
search.localize(napi.Locales())
- result = api_impl.format_result(search, 'json', {})
+ result = v1_format.format_result(search, 'json', {})
assert json.loads(result) == \
{'place_id': 37563,
napi.Point(1.0, 2.0),
geometry={'type': gtype})
- result = api_impl.format_result(search, 'json', {})
+ result = v1_format.format_result(search, 'json', {})
js = json.loads(result)
assert js['geometry'] == {'type': 'Point', 'coordinates': [1.0, 2.0]}
napi.Point(1.0, 2.0),
geometry={'geojson': '{"type":"Point","coordinates":[56.947,-87.44]}'})
- result = api_impl.format_result(search, 'json', {})
+ result = v1_format.format_result(search, 'json', {})
js = json.loads(result)
assert js['geometry'] == {'type': 'Point', 'coordinates': [56.947, -87.44]}
('amenity', 'restaurant'),
napi.Point(1.0, 2.0))
- result = api_impl.format_result(search, 'json', {'icon_base_url': 'foo'})
+ result = v1_format.format_result(search, 'json', {'icon_base_url': 'foo'})
js = json.loads(result)
assert js['icon'] == 'foo/food_restaurant.p.20.png'
('amenity', 'tree'),
napi.Point(1.0, 2.0))
- result = api_impl.format_result(search, 'json', {'icon_base_url': 'foo'})
+ result = v1_format.format_result(search, 'json', {'icon_base_url': 'foo'})
js = json.loads(result)
assert 'icon' not in js
distance=0.0)
])
- result = api_impl.format_result(search, 'json', {})
+ result = v1_format.format_result(search, 'json', {})
js = json.loads(result)
assert js['address'] == [{'localname': '',
distance=0.034)
])
- result = api_impl.format_result(search, 'json', {})
+ result = v1_format.format_result(search, 'json', {})
js = json.loads(result)
assert js[outfield] == [{'localname': 'Trespass',
distance=0.034)
])
- result = api_impl.format_result(search, 'json', {'group_hierarchy': True})
+ result = v1_format.format_result(search, 'json', {'group_hierarchy': True})
js = json.loads(result)
assert js['hierarchy'] == {'note': [{'localname': 'Trespass',
napi.WordInfo(23, 'foo', 'mefoo'),
napi.WordInfo(24, 'foo', 'bafoo')])
- result = api_impl.format_result(search, 'json', {'keywords': True})
+ result = v1_format.format_result(search, 'json', {'keywords': True})
js = json.loads(result)
assert js['keywords'] == {'name': [{'id': 23, 'token': 'foo'},
napi.WordInfo(23, 'foo', 'mefoo'),
napi.WordInfo(24, 'foo', 'bafoo')])
- result = api_impl.format_result(search, 'json', {'keywords': True})
+ result = v1_format.format_result(search, 'json', {'keywords': True})
js = json.loads(result)
assert js['keywords'] == {'address': [{'id': 23, 'token': 'foo'},
import pytest
-import nominatim_api.v1 as api_impl
+from nominatim_api.v1.format import dispatch as v1_format
import nominatim_api as napi
FORMATS = ['json', 'jsonv2', 'geojson', 'geocodejson', 'xml']
('amenity', 'post_box'),
napi.Point(0.3, -8.9))
- raw = api_impl.format_result(napi.ReverseResults([reverse]), fmt, {})
+ raw = v1_format.format_result(napi.ReverseResults([reverse]), fmt, {})
if fmt == 'xml':
root = ET.fromstring(raw)
@pytest.mark.parametrize('fmt', FORMATS)
def test_format_reverse_no_result(fmt):
- raw = api_impl.format_result(napi.ReverseResults(), fmt, {})
+ raw = v1_format.format_result(napi.ReverseResults(), fmt, {})
if fmt == 'xml':
root = ET.fromstring(raw)
place_id=5564,
osm_object=('N', 23))
- raw = api_impl.format_result(napi.ReverseResults([reverse]), fmt, {})
+ raw = v1_format.format_result(napi.ReverseResults([reverse]), fmt, {})
if fmt == 'xml':
root = ET.fromstring(raw).find('result')
]))
reverse.localize(napi.Locales())
- raw = api_impl.format_result(napi.ReverseResults([reverse]), fmt,
+ raw = v1_format.format_result(napi.ReverseResults([reverse]), fmt,
{'addressdetails': True})
reverse.localize(napi.Locales())
- raw = api_impl.format_result(napi.ReverseResults([reverse]), 'geocodejson',
+ raw = v1_format.format_result(napi.ReverseResults([reverse]), 'geocodejson',
{'addressdetails': True})
props = json.loads(raw)['features'][0]['properties']['geocoding']
napi.Point(1.0, 2.0),
address_rows=napi.AddressLines())
- raw = api_impl.format_result(napi.ReverseResults([reverse]), fmt,
+ raw = v1_format.format_result(napi.ReverseResults([reverse]), fmt,
{'addressdetails': True})
napi.Point(1.0, 2.0),
extratags={'one': 'A', 'two':'B'})
- raw = api_impl.format_result(napi.ReverseResults([reverse]), fmt,
+ raw = v1_format.format_result(napi.ReverseResults([reverse]), fmt,
{'extratags': True})
if fmt == 'xml':
('place', 'thing'),
napi.Point(1.0, 2.0))
- raw = api_impl.format_result(napi.ReverseResults([reverse]), fmt,
+ raw = v1_format.format_result(napi.ReverseResults([reverse]), fmt,
{'extratags': True})
if fmt == 'xml':
napi.Point(1.0, 2.0),
names={'name': 'A', 'ref':'1'})
- raw = api_impl.format_result(napi.ReverseResults([reverse]), fmt,
+ raw = v1_format.format_result(napi.ReverseResults([reverse]), fmt,
{'namedetails': True})
if fmt == 'xml':
('place', 'thing'),
napi.Point(1.0, 2.0))
- raw = api_impl.format_result(napi.ReverseResults([reverse]), fmt,
+ raw = v1_format.format_result(napi.ReverseResults([reverse]), fmt,
{'namedetails': True})
if fmt == 'xml':
('amenity', 'restaurant'),
napi.Point(1.0, 2.0))
- result = api_impl.format_result(napi.ReverseResults([reverse]), fmt,
+ result = v1_format.format_result(napi.ReverseResults([reverse]), fmt,
{'icon_base_url': 'foo'})
js = json.loads(result)
('amenity', 'tree'),
napi.Point(1.0, 2.0))
- result = api_impl.format_result(napi.ReverseResults([reverse]), fmt,
+ result = v1_format.format_result(napi.ReverseResults([reverse]), fmt,
{'icon_base_url': 'foo'})
assert 'icon' not in json.loads(result)
def test_adaptor_parse_format_use_default():
adaptor = FakeAdaptor()
- assert adaptor.parse_format(napi.StatusResult, 'text') == 'text'
+ assert glue.parse_format(adaptor, napi.StatusResult, 'text') == 'text'
assert adaptor.content_type == 'text/plain; charset=utf-8'
def test_adaptor_parse_format_use_configured():
adaptor = FakeAdaptor(params={'format': 'json'})
- assert adaptor.parse_format(napi.StatusResult, 'text') == 'json'
+ assert glue.parse_format(adaptor, napi.StatusResult, 'text') == 'json'
assert adaptor.content_type == 'application/json; charset=utf-8'
adaptor = FakeAdaptor(params={'format': '@!#'})
with pytest.raises(FakeError, match='^400 -- .*must be one of'):
- adaptor.parse_format(napi.StatusResult, 'text')
+ glue.parse_format(adaptor, napi.StatusResult, 'text')
# ASGIAdaptor.get_accepted_languages()
def test_accepted_languages_from_param():
a = FakeAdaptor(params={'accept-language': 'de'})
- assert a.get_accepted_languages() == 'de'
+ assert glue.get_accepted_languages(a) == 'de'
def test_accepted_languages_from_header():
a = FakeAdaptor(headers={'accept-language': 'de'})
- assert a.get_accepted_languages() == 'de'
+ assert glue.get_accepted_languages(a) == 'de'
def test_accepted_languages_from_default(monkeypatch):
monkeypatch.setenv('NOMINATIM_DEFAULT_LANGUAGE', 'de')
a = FakeAdaptor()
- assert a.get_accepted_languages() == 'de'
+ assert glue.get_accepted_languages(a) == 'de'
def test_accepted_languages_param_over_header():
a = FakeAdaptor(params={'accept-language': 'de'},
headers={'accept-language': 'en'})
- assert a.get_accepted_languages() == 'de'
+ assert glue.get_accepted_languages(a) == 'de'
def test_accepted_languages_header_over_default(monkeypatch):
monkeypatch.setenv('NOMINATIM_DEFAULT_LANGUAGE', 'en')
a = FakeAdaptor(headers={'accept-language': 'de'})
- assert a.get_accepted_languages() == 'de'
+ assert glue.get_accepted_languages(a) == 'de'
# ASGIAdaptor.raise_error()
@pytest.fixture(autouse=True)
def init_adaptor(self):
self.adaptor = FakeAdaptor()
- self.adaptor.setup_debugging()
+ glue.setup_debugging(self.adaptor)
def run_raise_error(self, msg, status):
with pytest.raises(FakeError) as excinfo:
err = self.run_raise_error('TEST', 404)
assert self.adaptor.content_type == 'text/plain; charset=utf-8'
- assert err.msg == 'TEST'
+ assert err.msg == 'ERROR 404: TEST'
assert err.status == 404
def test_raise_error_during_debug():
a = FakeAdaptor(params={'debug': '1'})
- a.setup_debugging()
+ glue.setup_debugging(a)
loglib.log().section('Ongoing')
with pytest.raises(FakeError) as excinfo:
# ASGIAdaptor.build_response
def test_build_response_without_content_type():
- resp = FakeAdaptor().build_response('attention')
+ resp = glue.build_response(FakeAdaptor(), 'attention')
assert isinstance(resp, FakeResponse)
assert resp.status == 200
def test_build_response_with_status():
a = FakeAdaptor(params={'format': 'json'})
- a.parse_format(napi.StatusResult, 'text')
+ glue.parse_format(a, napi.StatusResult, 'text')
- resp = a.build_response('stuff\nmore stuff', status=404)
+ resp = glue.build_response(a, 'stuff\nmore stuff', status=404)
assert isinstance(resp, FakeResponse)
assert resp.status == 404
def test_build_response_jsonp_with_json():
a = FakeAdaptor(params={'format': 'json', 'json_callback': 'test.func'})
- a.parse_format(napi.StatusResult, 'text')
+ glue.parse_format(a, napi.StatusResult, 'text')
- resp = a.build_response('{}')
+ resp = glue.build_response(a, '{}')
assert isinstance(resp, FakeResponse)
assert resp.status == 200
def test_build_response_jsonp_without_json():
a = FakeAdaptor(params={'format': 'text', 'json_callback': 'test.func'})
- a.parse_format(napi.StatusResult, 'text')
+ glue.parse_format(a, napi.StatusResult, 'text')
- resp = a.build_response('{}')
+ resp = glue.build_response(a, '{}')
assert isinstance(resp, FakeResponse)
assert resp.status == 200
@pytest.mark.parametrize('param', ['alert(); func', '\\n', '', 'a b'])
def test_build_response_jsonp_bad_format(param):
a = FakeAdaptor(params={'format': 'json', 'json_callback': param})
- a.parse_format(napi.StatusResult, 'text')
+ glue.parse_format(a, napi.StatusResult, 'text')
with pytest.raises(FakeError, match='^400 -- .*Invalid'):
- a.build_response('{}')
+ glue.build_response(a, '{}')
# status_endpoint()
import nominatim_db.clicmd.api
import nominatim_api as napi
+@pytest.mark.parametrize('call', ['search', 'reverse', 'lookup', 'details', 'status'])
+def test_list_format(cli_call, call):
+ assert 0 == cli_call(call, '--list-formats')
+
+
+@pytest.mark.parametrize('call', ['search', 'reverse', 'lookup', 'details', 'status'])
+def test_bad_format(cli_call, call):
+ assert 1 == cli_call(call, '--format', 'rsdfsdfsdfsaefsdfsd')
+
class TestCliStatusCall:
import pytest
@pytest.fixture
-def osm2pgsql_options(temp_db):
- """ A standard set of options for osm2pgsql.
+def osm2pgsql_options(temp_db, tmp_path):
+ """ A standard set of options for osm2pgsql
+ together with a osm2pgsql mock that just reflects the command line.
"""
- return dict(osm2pgsql='echo',
+ osm2pgsql_exec = tmp_path / 'osm2pgsql_mock'
+
+ osm2pgsql_exec.write_text("""#!/bin/sh
+
+if [ "$*" = "--version" ]; then
+ >&2 echo "2024-08-09 11:16:23 osm2pgsql version 11.7.2 (11.7.2)"
+else
+ echo "$@"
+fi
+ """)
+ osm2pgsql_exec.chmod(0o777)
+
+ return dict(osm2pgsql=str(osm2pgsql_exec),
osm2pgsql_cache=10,
osm2pgsql_style='style.file',
threads=1,
[Service]
Type=simple
-Environment="PYTHONPATH=/usr/local/lib/nominatim/lib-python/"
User=www-data
Group=www-data
WorkingDirectory=$USERHOME/nominatim-project