run: |
sudo apt-get install -y -qq libboost-system-dev libboost-filesystem-dev libexpat1-dev zlib1g-dev libbz2-dev libpq-dev libproj-dev libicu-dev liblua${LUA_VERSION}-dev lua${LUA_VERSION}
if [ "$FLAVOUR" == "oldstuff" ]; then
- pip3 install MarkupSafe==2.0.1 python-dotenv psycopg2==2.7.7 jinja2==2.8 psutil==5.4.2 pyicu==2.9 osmium PyYAML==5.1 sqlalchemy==1.4 datrie asyncpg
+ pip3 install MarkupSafe==2.0.1 python-dotenv psycopg2==2.7.7 jinja2==2.8 psutil==5.4.2 pyicu==2.9 osmium PyYAML==5.1 sqlalchemy==1.4 GeoAlchemy2==0.10.0 datrie asyncpg
else
sudo apt-get install -y -qq python3-icu python3-datrie python3-pyosmium python3-jinja2 python3-psutil python3-psycopg2 python3-dotenv python3-yaml python3-asyncpg
- pip3 install sqlalchemy
+ pip3 install sqlalchemy GeoAlchemy2
fi
shell: bash
env:
uses: shivammathur/setup-php@v2
with:
php-version: ${{ matrix.php }}
- tools: phpunit, phpcs, composer
+ tools: phpunit:9, phpcs, composer
ini-values: opcache.jit=disable
- uses: actions/setup-python@v4
if: (matrix.flavour == 'oldstuff') || (matrix.flavour == 'ubuntu-22')
- name: Install test prerequsites (from apt for Ununtu 2x)
- run: sudo apt-get install -y -qq python3-pytest uvicorn
+ run: sudo apt-get install -y -qq python3-pytest python3-pytest-asyncio uvicorn
if: matrix.flavour != 'oldstuff'
+ - name: Install newer pytest-asyncio
+ run: pip3 install -U pytest-asyncio
+ if: matrix.flavour == 'ubuntu-20'
+
- name: Install test prerequsites (from pip for Ubuntu 18)
- run: pip3 install pytest uvicorn
+ run: pip3 install pytest pytest-asyncio uvicorn
if: matrix.flavour == 'oldstuff'
- name: Install Python webservers
- run: pip3 install falcon sanic sanic-testing starlette
+ run: pip3 install falcon sanic sanic-testing sanic-cors starlette
- - name: Install latest pylint/mypy
- run: pip3 install -U pylint mypy types-PyYAML types-jinja2 types-psycopg2 types-psutil types-requests typing-extensions asgi_lifespan sqlalchemy2-stubs
+ - name: Install latest pylint
+ run: pip3 install -U pylint asgi_lifespan
- name: PHP linting
run: phpcs --report-width=120 .
- name: PHP unit tests
run: phpunit ./
working-directory: Nominatim/test/php
- if: ${{ (matrix.ubuntu == 20) || (matrix.ubuntu == 22) }}
- name: Python unit tests
run: python3 -m pytest test/python
python3 -m behave -DREMOVE_TEMPLATE=1 -DBUILDDIR=$GITHUB_WORKSPACE/build --format=progress3
working-directory: Nominatim/test/bdd
- - name: Install newer Python packages (for typechecking info)
- run: pip3 install -U osmium uvicorn
+ - name: Install mypy and typechecking info
+ run: pip3 install -U mypy osmium uvicorn types-PyYAML types-jinja2 types-psycopg2 types-psutil types-requests types-ujson typing-extensions
if: matrix.flavour != 'oldstuff'
- name: Python static typechecking
[mypy]
plugins = sqlalchemy.ext.mypy.plugin
+[mypy-sanic_cors.*]
+ignore_missing_imports = True
+
[mypy-icu.*]
ignore_missing_imports = True
[mypy-falcon.*]
ignore_missing_imports = True
+
+[mypy-geoalchemy2.*]
+ignore_missing_imports = True
# 'too-many-ancestors' is triggered already by deriving from UserDict
# 'not-context-manager' disabled because it causes false positives once
# typed Python is enabled. See also https://github.com/PyCQA/pylint/issues/5273
-disable=too-few-public-methods,duplicate-code,too-many-ancestors,bad-option-value,no-self-use,not-context-manager
+disable=too-few-public-methods,duplicate-code,too-many-ancestors,bad-option-value,no-self-use,not-context-manager,use-dict-literal
-good-names=i,x,y,m,fd,db,cc
+good-names=i,x,y,m,t,fd,db,cc
* [psutil](https://github.com/giampaolo/psutil)
* [Jinja2](https://palletsprojects.com/p/jinja/)
* [SQLAlchemy](https://www.sqlalchemy.org/) (1.4+ with greenlet support)
+ * [GeoAlchemy2](https://geoalchemy-2.readthedocs.io/) (0.10+)
* [asyncpg](https://magicstack.github.io/asyncpg) (0.8+)
* [PyICU](https://pypi.org/project/PyICU/)
* [PyYaml](https://pyyaml.org/) (5.1+)
* one of the following web frameworks:
* [falcon](https://falconframework.org/) (3.0+)
- * [sanic](https://sanic.dev)
+ * [sanic](https://sanic.dev) and (optionally) [sanic-cors](https://github.com/ashleysommer/sanic-cors)
* [starlette](https://www.starlette.io/)
* [uvicorn](https://www.uvicorn.org/) (only with falcon and starlette framworks)
* `ref` - content of `ref` tag if it exists
* `lat`, `lon` - latitude and longitude of the centroid of the object
* `boundingbox` - comma-separated list of corner coordinates ([see notes](#boundingbox))
- * `place_rank` - class [search rank](../customize/Ranking#search-rank)
- * `address_rank` - place [address rank](../customize/Ranking#address-rank)
+ * `place_rank` - class [search rank](../customize/Ranking.md#search-rank)
+ * `address_rank` - place [address rank](../customize/Ranking.md#address-rank)
* `display_name` - full comma-separated address
* `class`, `type` - key and value of the main OSM tag
* `importance` - computed importance rank
!!! warning "Deprecation warning"
The reverse API used to allow address lookup for a single OSM object by
- its OSM id. This use is now deprecated. Use the [Address Lookup API](../Lookup)
+ its OSM id. This use is now deprecated. Use the [Address Lookup API](Lookup.md)
instead.
### Output format
The main value for importance is derived from page ranking values for Wikipedia
pages for a place. For places that do not have their own
Wikipedia page, a formula is used that derives a static importance from the
-places [search rank](../customize/Ranking#search-rank).
+places [search rank](../customize/Ranking.md#search-rank).
In a second step, a secondary importance value is added which is meant to
represent how well-known the general area is where the place is located. It
* [mypy](http://mypy-lang.org/) (plus typing information for external libs)
* [Python Typing Extensions](https://github.com/python/typing_extensions) (for Python < 3.9)
* [pytest](https://pytest.org)
+* [pytest-asyncio](https://pytest-asyncio.readthedocs.io)
For testing the Python search frontend, you need to install extra dependencies
depending on your choice of webserver framework:
sudo apt install php-cgi phpunit php-codesniffer \
python3-pip python3-setuptools python3-dev
-pip3 install --user behave mkdocs mkdocstrings pytest pylint \
+pip3 install --user behave mkdocs mkdocstrings pytest pytest-asyncio pylint \
mypy types-PyYAML types-jinja2 types-psycopg2 types-psutil \
+ types-ujson types-requests typing-extensions\
sanic-testing httpx asgi-lifespan
```
$aFilteredPlaces['properties']['geocoding']['osm_id'] = $aPlace['osm_id'];
}
+ $aFilteredPlaces['properties']['geocoding']['osm_key'] = $aPlace['class'];
+ $aFilteredPlaces['properties']['geocoding']['osm_value'] = $aPlace['type'];
+
$aFilteredPlaces['properties']['geocoding']['type'] = addressRankToGeocodeJsonType($aPlace['rank_address']);
$aFilteredPlaces['properties']['geocoding']['accuracy'] = (int) $fDistance;
+++ /dev/null
-# SPDX-License-Identifier: GPL-2.0-only
-#
-# This file is part of Nominatim. (https://nominatim.org)
-#
-# Copyright (C) 2022 by the Nominatim developer community.
-# For a full list of authors see the git log.
-"""
-Implementation of classes for API access via libraries.
-"""
-from typing import Mapping, Optional, cast, Any
-import asyncio
-from pathlib import Path
-
-from sqlalchemy import text, event
-from sqlalchemy.engine.url import URL
-from sqlalchemy.ext.asyncio import create_async_engine
-import asyncpg
-
-from nominatim.config import Configuration
-from nominatim.apicmd.status import get_status, StatusResult
-
-class NominatimAPIAsync:
- """ API loader asynchornous version.
- """
- def __init__(self, project_dir: Path,
- environ: Optional[Mapping[str, str]] = None) -> None:
- self.config = Configuration(project_dir, environ)
-
- dsn = self.config.get_database_params()
-
- dburl = URL.create(
- 'postgresql+asyncpg',
- database=dsn.get('dbname'),
- username=dsn.get('user'), password=dsn.get('password'),
- host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
- query={k: v for k, v in dsn.items()
- if k not in ('user', 'password', 'dbname', 'host', 'port')})
- self.engine = create_async_engine(
- dburl, future=True,
- connect_args={'server_settings': {
- 'DateStyle': 'sql,european',
- 'max_parallel_workers_per_gather': '0'
- }})
- asyncio.get_event_loop().run_until_complete(self._query_server_version())
- asyncio.get_event_loop().run_until_complete(self.close())
-
- if self.server_version >= 110000:
- @event.listens_for(self.engine.sync_engine, "connect") # type: ignore[misc]
- def _on_connect(dbapi_con: Any, _: Any) -> None:
- cursor = dbapi_con.cursor()
- cursor.execute("SET jit_above_cost TO '-1'")
-
-
- async def _query_server_version(self) -> None:
- try:
- async with self.engine.begin() as conn:
- result = await conn.scalar(text('SHOW server_version_num'))
- self.server_version = int(cast(str, result))
- except asyncpg.PostgresError:
- self.server_version = 0
-
- async def close(self) -> None:
- """ Close all active connections to the database. The NominatimAPIAsync
- object remains usable after closing. If a new API functions is
- called, new connections are created.
- """
- await self.engine.dispose()
-
-
- async def status(self) -> StatusResult:
- """ Return the status of the database.
- """
- return await get_status(self.engine)
-
-
-class NominatimAPI:
- """ API loader, synchronous version.
- """
-
- def __init__(self, project_dir: Path,
- environ: Optional[Mapping[str, str]] = None) -> None:
- self.async_api = NominatimAPIAsync(project_dir, environ)
-
-
- def close(self) -> None:
- """ Close all active connections to the database. The NominatimAPIAsync
- object remains usable after closing. If a new API functions is
- called, new connections are created.
- """
- asyncio.get_event_loop().run_until_complete(self.async_api.close())
-
-
- def status(self) -> StatusResult:
- """ Return the status of the database.
- """
- return asyncio.get_event_loop().run_until_complete(self.async_api.status())
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+The public interface of the Nominatim library.
+
+Classes and functions defined in this file are considered stable. Always
+import from this file, not from the source files directly.
+"""
+
+# See also https://github.com/PyCQA/pylint/issues/6006
+# pylint: disable=useless-import-alias
+
+from .core import (NominatimAPI as NominatimAPI,
+ NominatimAPIAsync as NominatimAPIAsync)
+from .status import (StatusResult as StatusResult)
+from .types import (PlaceID as PlaceID,
+ OsmID as OsmID,
+ PlaceRef as PlaceRef,
+ Point as Point,
+ GeometryFormat as GeometryFormat,
+ LookupDetails as LookupDetails)
+from .results import (SourceTable as SourceTable,
+ AddressLine as AddressLine,
+ AddressLines as AddressLines,
+ WordInfo as WordInfo,
+ WordInfos as WordInfos,
+ SearchResult as SearchResult)
+from .localization import (Locales as Locales)
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Extended SQLAlchemy connection class that also includes access to the schema.
+"""
+from typing import Any, Mapping, Sequence, Union, Dict, cast
+
+import sqlalchemy as sa
+from sqlalchemy.ext.asyncio import AsyncConnection
+
+from nominatim.db.sqlalchemy_schema import SearchTables
+
+class SearchConnection:
+ """ An extended SQLAlchemy connection class, that also contains
+ then table definitions. The underlying asynchronous SQLAlchemy
+ connection can be accessed with the 'connection' property.
+ The 't' property is the collection of Nominatim tables.
+ """
+
+ def __init__(self, conn: AsyncConnection,
+ tables: SearchTables,
+ properties: Dict[str, Any]) -> None:
+ self.connection = conn
+ self.t = tables # pylint: disable=invalid-name
+ self._property_cache = properties
+
+
+ async def scalar(self, sql: sa.sql.base.Executable,
+ params: Union[Mapping[str, Any], None] = None
+ ) -> Any:
+ """ Execute a 'scalar()' query on the connection.
+ """
+ return await self.connection.scalar(sql, params)
+
+
+ async def execute(self, sql: sa.sql.base.Executable,
+ params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None] = None
+ ) -> 'sa.engine.Result[Any]':
+ """ Execute a 'execute()' query on the connection.
+ """
+ return await self.connection.execute(sql, params)
+
+
+ async def get_property(self, name: str, cached: bool = True) -> str:
+ """ Get a property from Nominatim's property table.
+
+ Property values are normally cached so that they are only
+ retrieved from the database when they are queried for the
+ first time with this function. Set 'cached' to False to force
+ reading the property from the database.
+
+ Raises a ValueError if the property does not exist.
+ """
+ if name.startswith('DB:'):
+ raise ValueError(f"Illegal property value '{name}'.")
+
+ if cached and name in self._property_cache:
+ return cast(str, self._property_cache[name])
+
+ sql = sa.select(self.t.properties.c.value)\
+ .where(self.t.properties.c.property == name)
+ value = await self.connection.scalar(sql)
+
+ if value is None:
+ raise ValueError(f"Property '{name}' not found in database.")
+
+ self._property_cache[name] = cast(str, value)
+
+ return cast(str, value)
+
+
+ async def get_db_property(self, name: str) -> Any:
+ """ Get a setting from the database. At the moment, only
+ 'server_version', the version of the database software, can
+ be retrieved with this function.
+
+ Raises a ValueError if the property does not exist.
+ """
+ if name != 'server_version':
+ raise ValueError(f"DB setting '{name}' not found in database.")
+
+ return self._property_cache['DB:server_version']
--- /dev/null
+# SPDX-License-Identifier: GPL-2.0-only
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Implementation of classes for API access via libraries.
+"""
+from typing import Mapping, Optional, Any, AsyncIterator, Dict
+import asyncio
+import contextlib
+from pathlib import Path
+
+import sqlalchemy as sa
+import sqlalchemy.ext.asyncio as sa_asyncio
+import asyncpg
+
+from nominatim.db.sqlalchemy_schema import SearchTables
+from nominatim.config import Configuration
+from nominatim.api.connection import SearchConnection
+from nominatim.api.status import get_status, StatusResult
+from nominatim.api.lookup import get_place_by_id
+from nominatim.api.types import PlaceRef, LookupDetails
+from nominatim.api.results import SearchResult
+
+
+class NominatimAPIAsync:
+ """ API loader asynchornous version.
+ """
+ def __init__(self, project_dir: Path,
+ environ: Optional[Mapping[str, str]] = None) -> None:
+ self.config = Configuration(project_dir, environ)
+ self.server_version = 0
+
+ self._engine_lock = asyncio.Lock()
+ self._engine: Optional[sa_asyncio.AsyncEngine] = None
+ self._tables: Optional[SearchTables] = None
+ self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
+
+
+ async def setup_database(self) -> None:
+ """ Set up the engine and connection parameters.
+
+ This function will be implicitly called when the database is
+ accessed for the first time. You may also call it explicitly to
+ avoid that the first call is delayed by the setup.
+ """
+ async with self._engine_lock:
+ if self._engine:
+ return
+
+ dsn = self.config.get_database_params()
+
+ dburl = sa.engine.URL.create(
+ 'postgresql+asyncpg',
+ database=dsn.get('dbname'),
+ username=dsn.get('user'), password=dsn.get('password'),
+ host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
+ query={k: v for k, v in dsn.items()
+ if k not in ('user', 'password', 'dbname', 'host', 'port')})
+ engine = sa_asyncio.create_async_engine(
+ dburl, future=True,
+ connect_args={'server_settings': {
+ 'DateStyle': 'sql,european',
+ 'max_parallel_workers_per_gather': '0'
+ }})
+
+ try:
+ async with engine.begin() as conn:
+ result = await conn.scalar(sa.text('SHOW server_version_num'))
+ server_version = int(result)
+ except asyncpg.PostgresError:
+ server_version = 0
+
+ if server_version >= 110000:
+ @sa.event.listens_for(engine.sync_engine, "connect")
+ def _on_connect(dbapi_con: Any, _: Any) -> None:
+ cursor = dbapi_con.cursor()
+ cursor.execute("SET jit_above_cost TO '-1'")
+ # Make sure that all connections get the new settings
+ await self.close()
+
+ self._property_cache['DB:server_version'] = server_version
+
+ self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
+ self._engine = engine
+
+
+ async def close(self) -> None:
+ """ Close all active connections to the database. The NominatimAPIAsync
+ object remains usable after closing. If a new API functions is
+ called, new connections are created.
+ """
+ if self._engine is not None:
+ await self._engine.dispose()
+
+
+ @contextlib.asynccontextmanager
+ async def begin(self) -> AsyncIterator[SearchConnection]:
+ """ Create a new connection with automatic transaction handling.
+
+ This function may be used to get low-level access to the database.
+ Refer to the documentation of SQLAlchemy for details how to use
+ the connection object.
+ """
+ if self._engine is None:
+ await self.setup_database()
+
+ assert self._engine is not None
+ assert self._tables is not None
+
+ async with self._engine.begin() as conn:
+ yield SearchConnection(conn, self._tables, self._property_cache)
+
+
+ async def status(self) -> StatusResult:
+ """ Return the status of the database.
+ """
+ try:
+ async with self.begin() as conn:
+ status = await get_status(conn)
+ except asyncpg.PostgresError:
+ return StatusResult(700, 'Database connection failed')
+
+ return status
+
+
+ async def lookup(self, place: PlaceRef,
+ details: LookupDetails) -> Optional[SearchResult]:
+ """ Get detailed information about a place in the database.
+
+ Returns None if there is no entry under the given ID.
+ """
+ async with self.begin() as db:
+ return await get_place_by_id(db, place, details)
+
+
+class NominatimAPI:
+ """ API loader, synchronous version.
+ """
+
+ def __init__(self, project_dir: Path,
+ environ: Optional[Mapping[str, str]] = None) -> None:
+ self._loop = asyncio.new_event_loop()
+ self._async_api = NominatimAPIAsync(project_dir, environ)
+
+
+ def close(self) -> None:
+ """ Close all active connections to the database. The NominatimAPIAsync
+ object remains usable after closing. If a new API functions is
+ called, new connections are created.
+ """
+ self._loop.run_until_complete(self._async_api.close())
+ self._loop.close()
+
+
+ @property
+ def config(self) -> Configuration:
+ """ Return the configuration used by the API.
+ """
+ return self._async_api.config
+
+ def status(self) -> StatusResult:
+ """ Return the status of the database.
+ """
+ return self._loop.run_until_complete(self._async_api.status())
+
+
+ def lookup(self, place: PlaceRef,
+ details: LookupDetails) -> Optional[SearchResult]:
+ """ Get detailed information about a place in the database.
+ """
+ return self._loop.run_until_complete(self._async_api.lookup(place, details))
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Helper functions for localizing names of results.
+"""
+from typing import Mapping, List, Optional
+
+import re
+
+class Locales:
+ """ Helper class for localization of names.
+
+ It takes a list of language prefixes in their order of preferred
+ usage.
+ """
+
+ def __init__(self, langs: Optional[List[str]] = None):
+ self.languages = langs or []
+ self.name_tags: List[str] = []
+
+ # Build the list of supported tags. It is currently hard-coded.
+ self._add_lang_tags('name')
+ self._add_tags('name', 'brand')
+ self._add_lang_tags('official_name', 'short_name')
+ self._add_tags('official_name', 'short_name', 'ref')
+
+
+ def __bool__(self) -> bool:
+ return len(self.languages) > 0
+
+
+ def _add_tags(self, *tags: str) -> None:
+ for tag in tags:
+ self.name_tags.append(tag)
+ self.name_tags.append(f"_place_{tag}")
+
+
+ def _add_lang_tags(self, *tags: str) -> None:
+ for tag in tags:
+ for lang in self.languages:
+ self.name_tags.append(f"{tag}:{lang}")
+ self.name_tags.append(f"_place_{tag}:{lang}")
+
+
+ def display_name(self, names: Optional[Mapping[str, str]]) -> str:
+ """ Return the best matching name from a dictionary of names
+ containing different name variants.
+
+ If 'names' is null or empty, an empty string is returned. If no
+ appropriate localization is found, the first name is returned.
+ """
+ if not names:
+ return ''
+
+ if len(names) > 1:
+ for tag in self.name_tags:
+ if tag in names:
+ return names[tag]
+
+ # Nothing? Return any of the other names as a default.
+ return next(iter(names.values()))
+
+
+ @staticmethod
+ def from_accept_languages(langstr: str) -> 'Locales':
+ """ Create a localization object from a language list in the
+ format of HTTP accept-languages header.
+
+ The functions tries to be forgiving of format errors by first splitting
+ the string into comma-separated parts and then parsing each
+ description separately. Badly formatted parts are then ignored.
+ """
+ # split string into languages
+ candidates = []
+ for desc in langstr.split(','):
+ m = re.fullmatch(r'\s*([a-z_-]+)(?:;\s*q\s*=\s*([01](?:\.\d+)?))?\s*',
+ desc, flags=re.I)
+ if m:
+ candidates.append((m[1], float(m[2] or 1.0)))
+
+ # sort the results by the weight of each language (preserving order).
+ candidates.sort(reverse=True, key=lambda e: e[1])
+
+ # If a language has a region variant, also add the language without
+ # variant but only if it isn't already in the list to not mess up the weight.
+ languages = []
+ for lid, _ in candidates:
+ languages.append(lid)
+ parts = lid.split('-', 1)
+ if len(parts) > 1 and all(c[0] != parts[0] for c in candidates):
+ languages.append(parts[0])
+
+ return Locales(languages)
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Implementation of place lookup by ID.
+"""
+from typing import Optional
+
+import sqlalchemy as sa
+
+from nominatim.typing import SaColumn, SaLabel, SaRow
+from nominatim.api.connection import SearchConnection
+import nominatim.api.types as ntyp
+import nominatim.api.results as nres
+
+def _select_column_geometry(column: SaColumn,
+ geometry_output: ntyp.GeometryFormat) -> SaLabel:
+ """ Create the appropriate column expression for selecting a
+ geometry for the details response.
+ """
+ if geometry_output & ntyp.GeometryFormat.GEOJSON:
+ return sa.literal_column(f"""
+ ST_AsGeoJSON(CASE WHEN ST_NPoints({column.name}) > 5000
+ THEN ST_SimplifyPreserveTopology({column.name}, 0.0001)
+ ELSE {column.name} END)
+ """).label('geometry_geojson')
+
+ return sa.func.ST_GeometryType(column).label('geometry_type')
+
+
+async def find_in_placex(conn: SearchConnection, place: ntyp.PlaceRef,
+ details: ntyp.LookupDetails) -> Optional[SaRow]:
+ """ Search for the given place in the placex table and return the
+ base information.
+ """
+ t = conn.t.placex
+ sql = sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
+ t.c.class_, t.c.type, t.c.admin_level,
+ t.c.address, t.c.extratags,
+ t.c.housenumber, t.c.postcode, t.c.country_code,
+ t.c.importance, t.c.wikipedia, t.c.indexed_date,
+ t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
+ t.c.linked_place_id,
+ sa.func.ST_X(t.c.centroid).label('x'),
+ sa.func.ST_Y(t.c.centroid).label('y'),
+ _select_column_geometry(t.c.geometry, details.geometry_output))
+
+ if isinstance(place, ntyp.PlaceID):
+ sql = sql.where(t.c.place_id == place.place_id)
+ elif isinstance(place, ntyp.OsmID):
+ sql = sql.where(t.c.osm_type == place.osm_type)\
+ .where(t.c.osm_id == place.osm_id)
+ if place.osm_class:
+ sql = sql.where(t.c.class_ == place.osm_class)
+ else:
+ sql = sql.order_by(t.c.class_)
+ sql = sql.limit(1)
+ else:
+ return None
+
+ return (await conn.execute(sql)).one_or_none()
+
+
+async def find_in_osmline(conn: SearchConnection, place: ntyp.PlaceRef,
+ details: ntyp.LookupDetails) -> Optional[SaRow]:
+ """ Search for the given place in the osmline table and return the
+ base information.
+ """
+ t = conn.t.osmline
+ sql = sa.select(t.c.place_id, t.c.osm_id, t.c.parent_place_id,
+ t.c.indexed_date, t.c.startnumber, t.c.endnumber,
+ t.c.step, t.c.address, t.c.postcode, t.c.country_code,
+ sa.func.ST_X(sa.func.ST_Centroid(t.c.linegeo)).label('x'),
+ sa.func.ST_Y(sa.func.ST_Centroid(t.c.linegeo)).label('y'),
+ _select_column_geometry(t.c.linegeo, details.geometry_output))
+
+ if isinstance(place, ntyp.PlaceID):
+ sql = sql.where(t.c.place_id == place.place_id)
+ elif isinstance(place, ntyp.OsmID) and place.osm_type == 'W':
+ # There may be multiple interpolations for a single way.
+ # If 'class' contains a number, return the one that belongs to that number.
+ sql = sql.where(t.c.osm_id == place.osm_id).limit(1)
+ if place.osm_class and place.osm_class.isdigit():
+ sql = sql.order_by(sa.func.greatest(0,
+ sa.func.least(int(place.osm_class) - t.c.endnumber),
+ t.c.startnumber - int(place.osm_class)))
+ else:
+ return None
+
+ return (await conn.execute(sql)).one_or_none()
+
+
+async def find_in_tiger(conn: SearchConnection, place: ntyp.PlaceRef,
+ details: ntyp.LookupDetails) -> Optional[SaRow]:
+ """ Search for the given place in the table of Tiger addresses and return
+ the base information. Only lookup by place ID is supported.
+ """
+ t = conn.t.tiger
+ sql = sa.select(t.c.place_id, t.c.parent_place_id,
+ t.c.startnumber, t.c.endnumber, t.c.step,
+ t.c.postcode,
+ sa.func.ST_X(sa.func.ST_Centroid(t.c.linegeo)).label('x'),
+ sa.func.ST_Y(sa.func.ST_Centroid(t.c.linegeo)).label('y'),
+ _select_column_geometry(t.c.linegeo, details.geometry_output))
+
+ if isinstance(place, ntyp.PlaceID):
+ sql = sql.where(t.c.place_id == place.place_id)
+ else:
+ return None
+
+ return (await conn.execute(sql)).one_or_none()
+
+
+async def find_in_postcode(conn: SearchConnection, place: ntyp.PlaceRef,
+ details: ntyp.LookupDetails) -> Optional[SaRow]:
+ """ Search for the given place in the postcode table and return the
+ base information. Only lookup by place ID is supported.
+ """
+ t = conn.t.postcode
+ sql = sa.select(t.c.place_id, t.c.parent_place_id,
+ t.c.rank_search, t.c.rank_address,
+ t.c.indexed_date, t.c.postcode, t.c.country_code,
+ sa.func.ST_X(t.c.geometry).label('x'),
+ sa.func.ST_Y(t.c.geometry).label('y'),
+ _select_column_geometry(t.c.geometry, details.geometry_output))
+
+ if isinstance(place, ntyp.PlaceID):
+ sql = sql.where(t.c.place_id == place.place_id)
+ else:
+ return None
+
+ return (await conn.execute(sql)).one_or_none()
+
+
+async def get_place_by_id(conn: SearchConnection, place: ntyp.PlaceRef,
+ details: ntyp.LookupDetails) -> Optional[nres.SearchResult]:
+ """ Retrieve a place with additional details from the database.
+ """
+ if details.geometry_output and details.geometry_output != ntyp.GeometryFormat.GEOJSON:
+ raise ValueError("lookup only supports geojosn polygon output.")
+
+ row = await find_in_placex(conn, place, details)
+ if row is not None:
+ result = nres.create_from_placex_row(row)
+ await nres.add_result_details(conn, result, details)
+ return result
+
+ row = await find_in_osmline(conn, place, details)
+ if row is not None:
+ result = nres.create_from_osmline_row(row)
+ await nres.add_result_details(conn, result, details)
+ return result
+
+ row = await find_in_postcode(conn, place, details)
+ if row is not None:
+ result = nres.create_from_postcode_row(row)
+ await nres.add_result_details(conn, result, details)
+ return result
+
+ row = await find_in_tiger(conn, place, details)
+ if row is not None:
+ result = nres.create_from_tiger_row(row)
+ await nres.add_result_details(conn, result, details)
+ return result
+
+ # Nothing found under this ID.
+ return None
#
# This file is part of Nominatim. (https://nominatim.org)
#
-# Copyright (C) 2022 by the Nominatim developer community.
+# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
-Helper classes and function for writing result formatting modules.
+Helper classes and functions for formating results into API responses.
"""
-from typing import Type, TypeVar, Dict, Mapping, List, Callable, Generic, Any
+from typing import Type, TypeVar, Dict, List, Callable, Any, Mapping
from collections import defaultdict
T = TypeVar('T') # pylint: disable=invalid-name
-FormatFunc = Callable[[T], str]
-
-class ResultFormatter(Generic[T]):
- """ This class dispatches format calls to the appropriate formatting
- function previously defined with the `format_func` decorator.
- """
-
- def __init__(self, funcs: Mapping[str, FormatFunc[T]]) -> None:
- self.functions = funcs
-
-
- def list_formats(self) -> List[str]:
- """ Return a list of formats supported by this formatter.
- """
- return list(self.functions.keys())
-
-
- def supports_format(self, fmt: str) -> bool:
- """ Check if the given format is supported by this formatter.
- """
- return fmt in self.functions
-
-
- def format(self, result: T, fmt: str) -> str:
- """ Convert the given result into a string using the given format.
-
- The format is expected to be in the list returned by
- `list_formats()`.
- """
- return self.functions[fmt](result)
+FormatFunc = Callable[[T, Mapping[str, Any]], str]
class FormatDispatcher:
- """ A factory class for result formatters.
+ """ Helper class to conveniently create formatting functions in
+ a module using decorators.
"""
def __init__(self) -> None:
return decorator
- def __call__(self, result_class: Type[T]) -> ResultFormatter[T]:
- """ Create an instance of a format class for the given result type.
+ def list_formats(self, result_type: Type[Any]) -> List[str]:
+ """ Return a list of formats supported by this formatter.
+ """
+ return list(self.format_functions[result_type].keys())
+
+
+ def supports_format(self, result_type: Type[Any], fmt: str) -> bool:
+ """ Check if the given format is supported by this formatter.
+ """
+ return fmt in self.format_functions[result_type]
+
+
+ def format_result(self, result: Any, fmt: str, options: Mapping[str, Any]) -> str:
+ """ Convert the given result into a string using the given format.
+
+ The format is expected to be in the list returned by
+ `list_formats()`.
"""
- return ResultFormatter(self.format_functions[result_class])
+ return self.format_functions[type(result)][fmt](result, options)
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Dataclasses for search results and helper functions to fill them.
+
+Data classes are part of the public API while the functions are for
+internal use only. That's why they are implemented as free-standing functions
+instead of member functions.
+"""
+from typing import Optional, Tuple, Dict, Sequence
+import enum
+import dataclasses
+import datetime as dt
+
+import sqlalchemy as sa
+
+from nominatim.typing import SaSelect, SaRow
+from nominatim.api.types import Point, LookupDetails
+from nominatim.api.connection import SearchConnection
+
+# This file defines complex result data classes.
+# pylint: disable=too-many-instance-attributes
+
+class SourceTable(enum.Enum):
+ """ Enumeration of kinds of results.
+ """
+ PLACEX = 1
+ OSMLINE = 2
+ TIGER = 3
+ POSTCODE = 4
+ COUNTRY = 5
+
+
+@dataclasses.dataclass
+class AddressLine:
+ """ Detailed information about a related place.
+ """
+ place_id: Optional[int]
+ osm_object: Optional[Tuple[str, int]]
+ category: Tuple[str, str]
+ names: Dict[str, str]
+ extratags: Optional[Dict[str, str]]
+
+ admin_level: Optional[int]
+ fromarea: bool
+ isaddress: bool
+ rank_address: int
+ distance: float
+
+
+AddressLines = Sequence[AddressLine]
+
+
+@dataclasses.dataclass
+class WordInfo:
+ """ Detailed information about a search term.
+ """
+ word_id: int
+ word_token: str
+ word: Optional[str] = None
+
+
+WordInfos = Sequence[WordInfo]
+
+
+@dataclasses.dataclass
+class SearchResult:
+ """ Data class collecting all available information about a search result.
+ """
+ source_table: SourceTable
+ category: Tuple[str, str]
+ centroid: Point
+
+ place_id : Optional[int] = None
+ parent_place_id: Optional[int] = None
+ linked_place_id: Optional[int] = None
+ osm_object: Optional[Tuple[str, int]] = None
+ admin_level: int = 15
+
+ names: Optional[Dict[str, str]] = None
+ address: Optional[Dict[str, str]] = None
+ extratags: Optional[Dict[str, str]] = None
+
+ housenumber: Optional[str] = None
+ postcode: Optional[str] = None
+ wikipedia: Optional[str] = None
+
+ rank_address: int = 30
+ rank_search: int = 30
+ importance: Optional[float] = None
+
+ country_code: Optional[str] = None
+
+ indexed_date: Optional[dt.datetime] = None
+
+ address_rows: Optional[AddressLines] = None
+ linked_rows: Optional[AddressLines] = None
+ parented_rows: Optional[AddressLines] = None
+ name_keywords: Optional[WordInfos] = None
+ address_keywords: Optional[WordInfos] = None
+
+ geometry: Dict[str, str] = dataclasses.field(default_factory=dict)
+
+ def __post_init__(self) -> None:
+ if self.indexed_date is not None and self.indexed_date.tzinfo is None:
+ self.indexed_date = self.indexed_date.replace(tzinfo=dt.timezone.utc)
+
+ @property
+ def lat(self) -> float:
+ """ Get the latitude (or y) of the center point of the place.
+ """
+ return self.centroid[1]
+
+
+ @property
+ def lon(self) -> float:
+ """ Get the longitude (or x) of the center point of the place.
+ """
+ return self.centroid[0]
+
+
+ def calculated_importance(self) -> float:
+ """ Get a valid importance value. This is either the stored importance
+ of the value or an artificial value computed from the place's
+ search rank.
+ """
+ return self.importance or (0.7500001 - (self.rank_search/40.0))
+
+
+ # pylint: disable=consider-using-f-string
+ def centroid_as_geojson(self) -> str:
+ """ Get the centroid in GeoJSON format.
+ """
+ return '{"type": "Point","coordinates": [%f, %f]}' % self.centroid
+
+
+def _filter_geometries(row: SaRow) -> Dict[str, str]:
+ return {k[9:]: v for k, v in row._mapping.items() # pylint: disable=W0212
+ if k.startswith('geometry_')}
+
+
+def create_from_placex_row(row: SaRow) -> SearchResult:
+ """ Construct a new SearchResult and add the data from the result row
+ from the placex table.
+ """
+ return SearchResult(source_table=SourceTable.PLACEX,
+ place_id=row.place_id,
+ parent_place_id=row.parent_place_id,
+ linked_place_id=row.linked_place_id,
+ osm_object=(row.osm_type, row.osm_id),
+ category=(row.class_, row.type),
+ admin_level=row.admin_level,
+ names=row.name,
+ address=row.address,
+ extratags=row.extratags,
+ housenumber=row.housenumber,
+ postcode=row.postcode,
+ wikipedia=row.wikipedia,
+ rank_address=row.rank_address,
+ rank_search=row.rank_search,
+ importance=row.importance,
+ country_code=row.country_code,
+ indexed_date=getattr(row, 'indexed_date'),
+ centroid=Point(row.x, row.y),
+ geometry=_filter_geometries(row))
+
+
+def create_from_osmline_row(row: SaRow) -> SearchResult:
+ """ Construct a new SearchResult and add the data from the result row
+ from the osmline table.
+ """
+ return SearchResult(source_table=SourceTable.OSMLINE,
+ place_id=row.place_id,
+ parent_place_id=row.parent_place_id,
+ osm_object=('W', row.osm_id),
+ category=('place', 'houses'),
+ address=row.address,
+ postcode=row.postcode,
+ extratags={'startnumber': str(row.startnumber),
+ 'endnumber': str(row.endnumber),
+ 'step': str(row.step)},
+ country_code=row.country_code,
+ indexed_date=getattr(row, 'indexed_date'),
+ centroid=Point(row.x, row.y),
+ geometry=_filter_geometries(row))
+
+
+def create_from_tiger_row(row: SaRow) -> SearchResult:
+ """ Construct a new SearchResult and add the data from the result row
+ from the Tiger table.
+ """
+ return SearchResult(source_table=SourceTable.TIGER,
+ place_id=row.place_id,
+ parent_place_id=row.parent_place_id,
+ category=('place', 'houses'),
+ postcode=row.postcode,
+ extratags={'startnumber': str(row.startnumber),
+ 'endnumber': str(row.endnumber),
+ 'step': str(row.step)},
+ country_code='us',
+ centroid=Point(row.x, row.y),
+ geometry=_filter_geometries(row))
+
+
+def create_from_postcode_row(row: SaRow) -> SearchResult:
+ """ Construct a new SearchResult and add the data from the result row
+ from the postcode centroid table.
+ """
+ return SearchResult(source_table=SourceTable.POSTCODE,
+ place_id=row.place_id,
+ parent_place_id=row.parent_place_id,
+ category=('place', 'postcode'),
+ names={'ref': row.postcode},
+ rank_search=row.rank_search,
+ rank_address=row.rank_address,
+ country_code=row.country_code,
+ centroid=Point(row.x, row.y),
+ indexed_date=row.indexed_date,
+ geometry=_filter_geometries(row))
+
+
+async def add_result_details(conn: SearchConnection, result: SearchResult,
+ details: LookupDetails) -> None:
+ """ Retrieve more details from the database according to the
+ parameters specified in 'details'.
+ """
+ if details.address_details:
+ await complete_address_details(conn, result)
+ if details.linked_places:
+ await complete_linked_places(conn, result)
+ if details.parented_places:
+ await complete_parented_places(conn, result)
+ if details.keywords:
+ await complete_keywords(conn, result)
+
+
+def _result_row_to_address_row(row: SaRow) -> AddressLine:
+ """ Create a new AddressLine from the results of a datbase query.
+ """
+ extratags: Dict[str, str] = getattr(row, 'extratags', {})
+ if 'place_type' in row:
+ extratags['place_type'] = row.place_type
+
+ names = row.name
+ if getattr(row, 'housenumber', None) is not None:
+ if names is None:
+ names = {}
+ names['housenumber'] = row.housenumber
+
+ return AddressLine(place_id=row.place_id,
+ osm_object=None if row.osm_type is None else (row.osm_type, row.osm_id),
+ category=(getattr(row, 'class'), row.type),
+ names=names,
+ extratags=extratags,
+ admin_level=row.admin_level,
+ fromarea=row.fromarea,
+ isaddress=getattr(row, 'isaddress', True),
+ rank_address=row.rank_address,
+ distance=row.distance)
+
+
+async def complete_address_details(conn: SearchConnection, result: SearchResult) -> None:
+ """ Retrieve information about places that make up the address of the result.
+ """
+ housenumber = -1
+ if result.source_table in (SourceTable.TIGER, SourceTable.OSMLINE):
+ if result.housenumber is not None:
+ housenumber = int(result.housenumber)
+ elif result.extratags is not None and 'startnumber' in result.extratags:
+ # details requests do not come with a specific house number
+ housenumber = int(result.extratags['startnumber'])
+
+ sfn = sa.func.get_addressdata(result.place_id, housenumber)\
+ .table_valued( # type: ignore[no-untyped-call]
+ sa.column('place_id', type_=sa.Integer),
+ 'osm_type',
+ sa.column('osm_id', type_=sa.BigInteger),
+ sa.column('name', type_=conn.t.types.Composite),
+ 'class', 'type', 'place_type',
+ sa.column('admin_level', type_=sa.Integer),
+ sa.column('fromarea', type_=sa.Boolean),
+ sa.column('isaddress', type_=sa.Boolean),
+ sa.column('rank_address', type_=sa.SmallInteger),
+ sa.column('distance', type_=sa.Float))
+ sql = sa.select(sfn).order_by(sa.column('rank_address').desc(),
+ sa.column('isaddress').desc())
+
+ result.address_rows = []
+ for row in await conn.execute(sql):
+ result.address_rows.append(_result_row_to_address_row(row))
+
+# pylint: disable=consider-using-f-string
+def _placex_select_address_row(conn: SearchConnection,
+ centroid: Point) -> SaSelect:
+ t = conn.t.placex
+ return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
+ t.c.class_.label('class'), t.c.type,
+ t.c.admin_level, t.c.housenumber,
+ sa.literal_column("""ST_GeometryType(geometry) in
+ ('ST_Polygon','ST_MultiPolygon')""").label('fromarea'),
+ t.c.rank_address,
+ sa.literal_column(
+ """ST_DistanceSpheroid(geometry, 'SRID=4326;POINT(%f %f)'::geometry,
+ 'SPHEROID["WGS 84",6378137,298.257223563, AUTHORITY["EPSG","7030"]]')
+ """ % centroid).label('distance'))
+
+
+async def complete_linked_places(conn: SearchConnection, result: SearchResult) -> None:
+ """ Retrieve information about places that link to the result.
+ """
+ result.linked_rows = []
+ if result.source_table != SourceTable.PLACEX:
+ return
+
+ sql = _placex_select_address_row(conn, result.centroid)\
+ .where(conn.t.placex.c.linked_place_id == result.place_id)
+
+ for row in await conn.execute(sql):
+ result.linked_rows.append(_result_row_to_address_row(row))
+
+
+async def complete_keywords(conn: SearchConnection, result: SearchResult) -> None:
+ """ Retrieve information about the search terms used for this place.
+ """
+ t = conn.t.search_name
+ sql = sa.select(t.c.name_vector, t.c.nameaddress_vector)\
+ .where(t.c.place_id == result.place_id)
+
+ result.name_keywords = []
+ result.address_keywords = []
+ for name_tokens, address_tokens in await conn.execute(sql):
+ t = conn.t.word
+ sel = sa.select(t.c.word_id, t.c.word_token, t.c.word)
+
+ for row in await conn.execute(sel.where(t.c.word_id == sa.any_(name_tokens))):
+ result.name_keywords.append(WordInfo(*row))
+
+ for row in await conn.execute(sel.where(t.c.word_id == sa.any_(address_tokens))):
+ result.address_keywords.append(WordInfo(*row))
+
+
+async def complete_parented_places(conn: SearchConnection, result: SearchResult) -> None:
+ """ Retrieve information about places that the result provides the
+ address for.
+ """
+ result.parented_rows = []
+ if result.source_table != SourceTable.PLACEX:
+ return
+
+ sql = _placex_select_address_row(conn, result.centroid)\
+ .where(conn.t.placex.c.parent_place_id == result.place_id)\
+ .where(conn.t.placex.c.rank_search == 30)
+
+ for row in await conn.execute(sql):
+ result.parented_rows.append(_result_row_to_address_row(row))
--- /dev/null
+# SPDX-License-Identifier: GPL-2.0-only
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Classes and function releated to status call.
+"""
+from typing import Optional
+import datetime as dt
+import dataclasses
+
+import sqlalchemy as sa
+
+from nominatim.api.connection import SearchConnection
+from nominatim import version
+
+@dataclasses.dataclass
+class StatusResult:
+ """ Result of a call to the status API.
+ """
+ status: int
+ message: str
+ software_version = version.NOMINATIM_VERSION
+ data_updated: Optional[dt.datetime] = None
+ database_version: Optional[version.NominatimVersion] = None
+
+
+async def get_status(conn: SearchConnection) -> StatusResult:
+ """ Execute a status API call.
+ """
+ status = StatusResult(0, 'OK')
+
+ # Last update date
+ sql = sa.select(conn.t.import_status.c.lastimportdate).limit(1)
+ status.data_updated = await conn.scalar(sql)
+
+ # Database version
+ try:
+ verstr = await conn.get_property('database_version')
+ status.database_version = version.parse_version(verstr)
+ except ValueError:
+ pass
+
+ return status
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Complex datatypes used by the Nominatim API.
+"""
+from typing import Optional, Union, NamedTuple
+import dataclasses
+import enum
+
+@dataclasses.dataclass
+class PlaceID:
+ """ Reference an object by Nominatim's internal ID.
+ """
+ place_id: int
+
+
+@dataclasses.dataclass
+class OsmID:
+ """ Reference by the OSM ID and potentially the basic category.
+ """
+ osm_type: str
+ osm_id: int
+ osm_class: Optional[str] = None
+
+ def __post_init__(self) -> None:
+ if self.osm_type not in ('N', 'W', 'R'):
+ raise ValueError(f"Illegal OSM type '{self.osm_type}'. Must be one of N, W, R.")
+
+
+PlaceRef = Union[PlaceID, OsmID]
+
+
+class Point(NamedTuple):
+ """ A geographic point in WGS84 projection.
+ """
+ x: float
+ y: float
+
+
+ @property
+ def lat(self) -> float:
+ """ Return the latitude of the point.
+ """
+ return self.y
+
+
+ @property
+ def lon(self) -> float:
+ """ Return the longitude of the point.
+ """
+ return self.x
+
+
+class GeometryFormat(enum.Flag):
+ """ Geometry output formats supported by Nominatim.
+ """
+ NONE = 0
+ GEOJSON = enum.auto()
+ KML = enum.auto()
+ SVG = enum.auto()
+ TEXT = enum.auto()
+
+
+@dataclasses.dataclass
+class LookupDetails:
+ """ Collection of parameters that define the amount of details
+ returned with a search result.
+ """
+ geometry_output: GeometryFormat = GeometryFormat.NONE
+ """ Add the full geometry of the place to the result. Multiple
+ formats may be selected. Note that geometries can become quite large.
+ """
+ address_details: bool = False
+ """ Get detailed information on the places that make up the address
+ for the result.
+ """
+ linked_places: bool = False
+ """ Get detailed information on the places that link to the result.
+ """
+ parented_places: bool = False
+ """ Get detailed information on all places that this place is a parent
+ for, i.e. all places for which it provides the address details.
+ Only POI places can have parents.
+ """
+ keywords: bool = False
+ """ Add information about the search terms used for this place.
+ """
--- /dev/null
+# SPDX-License-Identifier: GPL-2.0-only
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Implementation of API version v1 (aka the legacy version).
+"""
+
+#pylint: disable=useless-import-alias
+
+from nominatim.api.v1.server_glue import (ASGIAdaptor as ASGIAdaptor,
+ EndpointFunc as EndpointFunc,
+ ROUTES as ROUTES)
+
+import nominatim.api.v1.format as _format
+
+list_formats = _format.dispatch.list_formats
+supports_format = _format.dispatch.supports_format
+format_result = _format.dispatch.format_result
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Hard-coded information about tag catagories.
+
+These tables have been copied verbatim from the old PHP code. For future
+version a more flexible formatting is required.
+"""
+
+ICONS = {
+ ('boundary', 'administrative'): 'poi_boundary_administrative',
+ ('place', 'city'): 'poi_place_city',
+ ('place', 'town'): 'poi_place_town',
+ ('place', 'village'): 'poi_place_village',
+ ('place', 'hamlet'): 'poi_place_village',
+ ('place', 'suburb'): 'poi_place_village',
+ ('place', 'locality'): 'poi_place_village',
+ ('place', 'airport'): 'transport_airport2',
+ ('aeroway', 'aerodrome'): 'transport_airport2',
+ ('railway', 'station'): 'transport_train_station2',
+ ('amenity', 'place_of_worship'): 'place_of_worship_unknown3',
+ ('amenity', 'pub'): 'food_pub',
+ ('amenity', 'bar'): 'food_bar',
+ ('amenity', 'university'): 'education_university',
+ ('tourism', 'museum'): 'tourist_museum',
+ ('amenity', 'arts_centre'): 'tourist_art_gallery2',
+ ('tourism', 'zoo'): 'tourist_zoo',
+ ('tourism', 'theme_park'): 'poi_point_of_interest',
+ ('tourism', 'attraction'): 'poi_point_of_interest',
+ ('leisure', 'golf_course'): 'sport_golf',
+ ('historic', 'castle'): 'tourist_castle',
+ ('amenity', 'hospital'): 'health_hospital',
+ ('amenity', 'school'): 'education_school',
+ ('amenity', 'theatre'): 'tourist_theatre',
+ ('amenity', 'library'): 'amenity_library',
+ ('amenity', 'fire_station'): 'amenity_firestation3',
+ ('amenity', 'police'): 'amenity_police2',
+ ('amenity', 'bank'): 'money_bank2',
+ ('amenity', 'post_office'): 'amenity_post_office',
+ ('tourism', 'hotel'): 'accommodation_hotel2',
+ ('amenity', 'cinema'): 'tourist_cinema',
+ ('tourism', 'artwork'): 'tourist_art_gallery2',
+ ('historic', 'archaeological_site'): 'tourist_archaeological2',
+ ('amenity', 'doctors'): 'health_doctors',
+ ('leisure', 'sports_centre'): 'sport_leisure_centre',
+ ('leisure', 'swimming_pool'): 'sport_swimming_outdoor',
+ ('shop', 'supermarket'): 'shopping_supermarket',
+ ('shop', 'convenience'): 'shopping_convenience',
+ ('amenity', 'restaurant'): 'food_restaurant',
+ ('amenity', 'fast_food'): 'food_fastfood',
+ ('amenity', 'cafe'): 'food_cafe',
+ ('tourism', 'guest_house'): 'accommodation_bed_and_breakfast',
+ ('amenity', 'pharmacy'): 'health_pharmacy_dispensing',
+ ('amenity', 'fuel'): 'transport_fuel',
+ ('natural', 'peak'): 'poi_peak',
+ ('natural', 'wood'): 'landuse_coniferous_and_deciduous',
+ ('shop', 'bicycle'): 'shopping_bicycle',
+ ('shop', 'clothes'): 'shopping_clothes',
+ ('shop', 'hairdresser'): 'shopping_hairdresser',
+ ('shop', 'doityourself'): 'shopping_diy',
+ ('shop', 'estate_agent'): 'shopping_estateagent2',
+ ('shop', 'car'): 'shopping_car',
+ ('shop', 'garden_centre'): 'shopping_garden_centre',
+ ('shop', 'car_repair'): 'shopping_car_repair',
+ ('shop', 'bakery'): 'shopping_bakery',
+ ('shop', 'butcher'): 'shopping_butcher',
+ ('shop', 'apparel'): 'shopping_clothes',
+ ('shop', 'laundry'): 'shopping_laundrette',
+ ('shop', 'beverages'): 'shopping_alcohol',
+ ('shop', 'alcohol'): 'shopping_alcohol',
+ ('shop', 'optician'): 'health_opticians',
+ ('shop', 'chemist'): 'health_pharmacy',
+ ('shop', 'gallery'): 'tourist_art_gallery2',
+ ('shop', 'jewelry'): 'shopping_jewelry',
+ ('tourism', 'information'): 'amenity_information',
+ ('historic', 'ruins'): 'tourist_ruin',
+ ('amenity', 'college'): 'education_school',
+ ('historic', 'monument'): 'tourist_monument',
+ ('historic', 'memorial'): 'tourist_monument',
+ ('historic', 'mine'): 'poi_mine',
+ ('tourism', 'caravan_site'): 'accommodation_caravan_park',
+ ('amenity', 'bus_station'): 'transport_bus_station',
+ ('amenity', 'atm'): 'money_atm2',
+ ('tourism', 'viewpoint'): 'tourist_view_point',
+ ('tourism', 'guesthouse'): 'accommodation_bed_and_breakfast',
+ ('railway', 'tram'): 'transport_tram_stop',
+ ('amenity', 'courthouse'): 'amenity_court',
+ ('amenity', 'recycling'): 'amenity_recycling',
+ ('amenity', 'dentist'): 'health_dentist',
+ ('natural', 'beach'): 'tourist_beach',
+ ('railway', 'tram_stop'): 'transport_tram_stop',
+ ('amenity', 'prison'): 'amenity_prison',
+ ('highway', 'bus_stop'): 'transport_bus_stop2'
+}
--- /dev/null
+# SPDX-License-Identifier: GPL-2.0-only
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Output formatters for API version v1.
+"""
+from typing import Mapping, Any
+import collections
+
+import nominatim.api as napi
+from nominatim.api.result_formatting import FormatDispatcher
+from nominatim.api.v1.classtypes import ICONS
+from nominatim.utils.json_writer import JsonWriter
+
+dispatch = FormatDispatcher()
+
+@dispatch.format_func(napi.StatusResult, 'text')
+def _format_status_text(result: napi.StatusResult, _: Mapping[str, Any]) -> str:
+ if result.status:
+ return f"ERROR: {result.message}"
+
+ return 'OK'
+
+
+@dispatch.format_func(napi.StatusResult, 'json')
+def _format_status_json(result: napi.StatusResult, _: Mapping[str, Any]) -> str:
+ out = JsonWriter()
+
+ out.start_object()\
+ .keyval('status', result.status)\
+ .keyval('message', result.message)\
+ .keyval_not_none('data_updated', result.data_updated,
+ lambda v: v.isoformat())\
+ .keyval('software_version', str(result.software_version))\
+ .keyval_not_none('database_version', result.database_version, str)\
+ .end_object()
+
+ return out()
+
+
+def _add_address_row(writer: JsonWriter, row: napi.AddressLine,
+ locales: napi.Locales) -> None:
+ writer.start_object()\
+ .keyval('localname', locales.display_name(row.names))\
+ .keyval_not_none('place_id', row.place_id)
+
+ if row.osm_object is not None:
+ writer.keyval('osm_id', row.osm_object[1])\
+ .keyval('osm_type', row.osm_object[0])
+
+ if row.extratags:
+ writer.keyval_not_none('place_type', row.extratags.get('place_type'))
+
+ writer.keyval('class', row.category[0])\
+ .keyval('type', row.category[1])\
+ .keyval_not_none('admin_level', row.admin_level)\
+ .keyval('rank_address', row.rank_address)\
+ .keyval('distance', row.distance)\
+ .keyval('isaddress', row.isaddress)\
+ .end_object()
+
+
+def _add_address_rows(writer: JsonWriter, section: str, rows: napi.AddressLines,
+ locales: napi.Locales) -> None:
+ writer.key(section).start_array()
+ for row in rows:
+ _add_address_row(writer, row, locales)
+ writer.next()
+ writer.end_array().next()
+
+
+def _add_parent_rows_grouped(writer: JsonWriter, rows: napi.AddressLines,
+ locales: napi.Locales) -> None:
+ # group by category type
+ data = collections.defaultdict(list)
+ for row in rows:
+ sub = JsonWriter()
+ _add_address_row(sub, row, locales)
+ data[row.category[1]].append(sub())
+
+ writer.key('hierarchy').start_object()
+ for group, grouped in data.items():
+ writer.key(group).start_array()
+ grouped.sort() # sorts alphabetically by local name
+ for line in grouped:
+ writer.raw(line).next()
+ writer.end_array().next()
+
+ writer.end_object().next()
+
+
+@dispatch.format_func(napi.SearchResult, 'details-json')
+def _format_search_json(result: napi.SearchResult, options: Mapping[str, Any]) -> str:
+ locales = options.get('locales', napi.Locales())
+ geom = result.geometry.get('geojson')
+ centroid = result.centroid_as_geojson()
+
+ out = JsonWriter()
+ out.start_object()\
+ .keyval_not_none('place_id', result.place_id)\
+ .keyval_not_none('parent_place_id', result.parent_place_id)
+
+ if result.osm_object is not None:
+ out.keyval('osm_type', result.osm_object[0])\
+ .keyval('osm_id', result.osm_object[1])
+
+ out.keyval('category', result.category[0])\
+ .keyval('type', result.category[1])\
+ .keyval('admin_level', result.admin_level)\
+ .keyval('localname', locales.display_name(result.names))\
+ .keyval_not_none('names', result.names or None)\
+ .keyval_not_none('addresstags', result.address or None)\
+ .keyval_not_none('housenumber', result.housenumber)\
+ .keyval_not_none('calculated_postcode', result.postcode)\
+ .keyval_not_none('country_code', result.country_code)\
+ .keyval_not_none('indexed_date', result.indexed_date, lambda v: v.isoformat())\
+ .keyval_not_none('importance', result.importance)\
+ .keyval('calculated_importance', result.calculated_importance())\
+ .keyval_not_none('extratags', result.extratags or None)\
+ .keyval_not_none('calculated_wikipedia', result.wikipedia)\
+ .keyval('rank_address', result.rank_address)\
+ .keyval('rank_search', result.rank_search)\
+ .keyval('isarea', 'Polygon' in (geom or result.geometry.get('type') or ''))\
+ .key('centroid').raw(centroid).next()\
+ .key('geometry').raw(geom or centroid).next()
+
+ if options.get('icon_base_url', None):
+ icon = ICONS.get(result.category)
+ if icon:
+ out.keyval('icon', f"{options['icon_base_url']}/{icon}.p.20.png")
+
+ if result.address_rows is not None:
+ _add_address_rows(out, 'address', result.address_rows, locales)
+
+ if result.linked_rows is not None:
+ _add_address_rows(out, 'linked_places', result.linked_rows, locales)
+
+ if result.name_keywords is not None or result.address_keywords is not None:
+ out.key('keywords').start_object()
+
+ for sec, klist in (('name', result.name_keywords), ('address', result.address_keywords)):
+ out.key(sec).start_array()
+ for word in (klist or []):
+ out.start_object()\
+ .keyval('id', word.word_id)\
+ .keyval('token', word.word_token)\
+ .end_object().next()
+ out.end_array().next()
+
+ out.end_object().next()
+
+ if result.parented_rows is not None:
+ if options.get('group_hierarchy', False):
+ _add_parent_rows_grouped(out, result.parented_rows, locales)
+ else:
+ _add_address_rows(out, 'hierarchy', result.parented_rows, locales)
+
+ out.end_object()
+
+ return out()
--- /dev/null
+# SPDX-License-Identifier: GPL-2.0-only
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Generic part of the server implementation of the v1 API.
+Combine with the scaffolding provided for the various Python ASGI frameworks.
+"""
+from typing import Optional, Any, Type, Callable
+import abc
+
+from nominatim.config import Configuration
+import nominatim.api as napi
+from nominatim.api.v1.format import dispatch as formatting
+
+CONTENT_TYPE = {
+ 'text': 'text/plain; charset=utf-8',
+ 'xml': 'text/xml; charset=utf-8',
+ 'jsonp': 'application/javascript'
+}
+
+
+class ASGIAdaptor(abc.ABC):
+ """ Adapter class for the different ASGI frameworks.
+ Wraps functionality over concrete requests and responses.
+ """
+
+ @abc.abstractmethod
+ def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
+ """ Return an input parameter as a string. If the parameter was
+ not provided, return the 'default' value.
+ """
+
+ @abc.abstractmethod
+ def get_header(self, name: str, default: Optional[str] = None) -> Optional[str]:
+ """ Return a HTTP header parameter as a string. If the parameter was
+ not provided, return the 'default' value.
+ """
+
+
+ @abc.abstractmethod
+ def error(self, msg: str, status: int = 400) -> Exception:
+ """ Construct an appropriate exception from the given error message.
+ The exception must result in a HTTP error with the given status.
+ """
+
+
+ @abc.abstractmethod
+ def create_response(self, status: int, output: str, content_type: str) -> Any:
+ """ Create a response from the given parameters. The result will
+ be returned by the endpoint functions. The adaptor may also
+ return None when the response is created internally with some
+ different means.
+
+ The response must return the HTTP given status code 'status', set
+ the HTTP content-type headers to the string provided and the
+ body of the response to 'output'.
+ """
+
+
+ @abc.abstractmethod
+ def config(self) -> Configuration:
+ """ Return the current configuration object.
+ """
+
+
+ def build_response(self, output: str, media_type: str, status: int = 200) -> Any:
+ """ Create a response from the given output. Wraps a JSONP function
+ around the response, if necessary.
+ """
+ if media_type == 'json' and status == 200:
+ jsonp = self.get('json_callback')
+ if jsonp is not None:
+ if any(not part.isidentifier() for part in jsonp.split('.')):
+ raise self.error('Invalid json_callback value')
+ output = f"{jsonp}({output})"
+ media_type = 'jsonp'
+
+ return self.create_response(status, output,
+ CONTENT_TYPE.get(media_type, 'application/json'))
+
+
+ def get_int(self, name: str, default: Optional[int] = None) -> int:
+ """ Return an input parameter as an int. Raises an exception if
+ the parameter is given but not in an integer format.
+
+ If 'default' is given, then it will be returned when the parameter
+ is missing completely. When 'default' is None, an error will be
+ raised on a missing parameter.
+ """
+ value = self.get(name)
+
+ if value is None:
+ if default is not None:
+ return default
+
+ raise self.error(f"Parameter '{name}' missing.")
+
+ try:
+ return int(value)
+ except ValueError as exc:
+ raise self.error(f"Parameter '{name}' must be a number.") from exc
+
+
+ def get_bool(self, name: str, default: Optional[bool] = None) -> bool:
+ """ Return an input parameter as bool. Only '0' is accepted as
+ an input for 'false' all other inputs will be interpreted as 'true'.
+
+ If 'default' is given, then it will be returned when the parameter
+ is missing completely. When 'default' is None, an error will be
+ raised on a missing parameter.
+ """
+ value = self.get(name)
+
+ if value is None:
+ if default is not None:
+ return default
+
+ raise self.error(f"Parameter '{name}' missing.")
+
+ return value != '0'
+
+
+ def get_accepted_languages(self) -> str:
+ """ Return the accepted langauges.
+ """
+ return self.get('accept-language')\
+ or self.get_header('http_accept_language')\
+ or self.config().DEFAULT_LANGUAGE
+
+
+def parse_format(params: ASGIAdaptor, result_type: Type[Any], default: str) -> str:
+ """ Get and check the 'format' parameter and prepare the formatter.
+ `fmtter` is a formatter and `default` the
+ format value to assume when no parameter is present.
+ """
+ fmt = params.get('format', default=default)
+ assert fmt is not None
+
+ if not formatting.supports_format(result_type, fmt):
+ raise params.error("Parameter 'format' must be one of: " +
+ ', '.join(formatting.list_formats(result_type)))
+
+ return fmt
+
+
+async def status_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
+ """ Server glue for /status endpoint. See API docs for details.
+ """
+ result = await api.status()
+
+ fmt = parse_format(params, napi.StatusResult, 'text')
+
+ if fmt == 'text' and result.status:
+ status_code = 500
+ else:
+ status_code = 200
+
+ return params.build_response(formatting.format_result(result, fmt, {}), fmt,
+ status=status_code)
+
+
+async def details_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
+ """ Server glue for /details endpoint. See API docs for details.
+ """
+ place_id = params.get_int('place_id', 0)
+ place: napi.PlaceRef
+ if place_id:
+ place = napi.PlaceID(place_id)
+ else:
+ osmtype = params.get('osmtype')
+ if osmtype is None:
+ raise params.error("Missing ID parameter 'place_id' or 'osmtype'.")
+ place = napi.OsmID(osmtype, params.get_int('osmid'), params.get('class'))
+
+ details = napi.LookupDetails(address_details=params.get_bool('addressdetails', False),
+ linked_places=params.get_bool('linkedplaces', False),
+ parented_places=params.get_bool('hierarchy', False),
+ keywords=params.get_bool('keywords', False))
+
+ if params.get_bool('polygon_geojson', False):
+ details.geometry_output = napi.GeometryFormat.GEOJSON
+
+ locales = napi.Locales.from_accept_languages(params.get_accepted_languages())
+ print(locales.languages)
+
+ result = await api.lookup(place, details)
+
+ if result is None:
+ raise params.error('No place with that OSM ID found.', status=404)
+
+ output = formatting.format_result(
+ result,
+ 'details-json',
+ {'locales': locales,
+ 'group_hierarchy': params.get_bool('group_hierarchy', False),
+ 'icon_base_url': params.config().MAPICON_URL})
+
+ return params.build_response(output, 'json')
+
+
+EndpointFunc = Callable[[napi.NominatimAPIAsync, ASGIAdaptor], Any]
+
+ROUTES = [
+ ('status', status_endpoint),
+ ('details', details_endpoint)
+]
+++ /dev/null
-# SPDX-License-Identifier: GPL-2.0-only
-#
-# This file is part of Nominatim. (https://nominatim.org)
-#
-# Copyright (C) 2022 by the Nominatim developer community.
-# For a full list of authors see the git log.
-"""
-Classes and function releated to status call.
-"""
-from typing import Optional, cast
-import datetime as dt
-
-import sqlalchemy as sqla
-from sqlalchemy.ext.asyncio.engine import AsyncEngine, AsyncConnection
-import asyncpg
-
-from nominatim import version
-
-class StatusResult:
- """ Result of a call to the status API.
- """
-
- def __init__(self, status: int, msg: str):
- self.status = status
- self.message = msg
- self.software_version = version.NOMINATIM_VERSION
- self.data_updated: Optional[dt.datetime] = None
- self.database_version: Optional[version.NominatimVersion] = None
-
-
-async def _get_database_date(conn: AsyncConnection) -> Optional[dt.datetime]:
- """ Query the database date.
- """
- sql = sqla.text('SELECT lastimportdate FROM import_status LIMIT 1')
- result = await conn.execute(sql)
-
- for row in result:
- return cast(dt.datetime, row[0])
-
- return None
-
-
-async def _get_database_version(conn: AsyncConnection) -> Optional[version.NominatimVersion]:
- sql = sqla.text("""SELECT value FROM nominatim_properties
- WHERE property = 'database_version'""")
- result = await conn.execute(sql)
-
- for row in result:
- return version.parse_version(cast(str, row[0]))
-
- return None
-
-
-async def get_status(engine: AsyncEngine) -> StatusResult:
- """ Execute a status API call.
- """
- status = StatusResult(0, 'OK')
- try:
- async with engine.begin() as conn:
- status.data_updated = await _get_database_date(conn)
- status.database_version = await _get_database_version(conn)
- except asyncpg.PostgresError:
- return StatusResult(700, 'Database connection failed')
-
- return status
server_module = importlib.import_module('nominatim.server.sanic.server')
app = server_module.get_application(args.project_dir)
- app.run(host=host, port=port, debug=True)
+ app.run(host=host, port=port, debug=True, single_process=True)
else:
import uvicorn # pylint: disable=import-outside-toplevel
#
# This file is part of Nominatim. (https://nominatim.org)
#
-# Copyright (C) 2022 by the Nominatim developer community.
+# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Subcommand definitions for API calls from the command line.
from typing import Mapping, Dict
import argparse
import logging
+import json
+import sys
from nominatim.tools.exec_utils import run_api_script
from nominatim.errors import UsageError
from nominatim.clicmd.args import NominatimArgs
-from nominatim.api import NominatimAPI
-from nominatim.apicmd.status import StatusResult
-import nominatim.result_formatter.v1 as formatting
+import nominatim.api as napi
+import nominatim.api.v1 as api_output
# Do not repeat documentation of subcommand classes.
# pylint: disable=C0111
('namedetails', 'Include a list of alternative names')
)
-DETAILS_SWITCHES = (
- ('addressdetails', 'Include a breakdown of the address into elements'),
- ('keywords', 'Include a list of name keywords and address keywords'),
- ('linkedplaces', 'Include a details of places that are linked with this one'),
- ('hierarchy', 'Include details of places lower in the address hierarchy'),
- ('group_hierarchy', 'Group the places by type'),
- ('polygon_geojson', 'Include geometry of result')
-)
-
def _add_api_output_arguments(parser: argparse.ArgumentParser) -> None:
group = parser.add_argument_group('Output arguments')
group.add_argument('--format', default='jsonv2',
"of the same object."))
group = parser.add_argument_group('Output arguments')
- for name, desc in DETAILS_SWITCHES:
- group.add_argument('--' + name, action='store_true', help=desc)
+ group.add_argument('--addressdetails', action='store_true',
+ help='Include a breakdown of the address into elements')
+ group.add_argument('--keywords', action='store_true',
+ help='Include a list of name keywords and address keywords')
+ group.add_argument('--linkedplaces', action='store_true',
+ help='Include a details of places that are linked with this one')
+ group.add_argument('--hierarchy', action='store_true',
+ help='Include details of places lower in the address hierarchy')
+ group.add_argument('--group_hierarchy', action='store_true',
+ help='Group the places by type')
+ group.add_argument('--polygon_geojson', action='store_true',
+ help='Include geometry of result')
group.add_argument('--lang', '--accept-language', metavar='LANGS',
help='Preferred language order for presenting search results')
def run(self, args: NominatimArgs) -> int:
+ place: napi.PlaceRef
if args.node:
- params = dict(osmtype='N', osmid=args.node)
+ place = napi.OsmID('N', args.node, args.object_class)
elif args.way:
- params = dict(osmtype='W', osmid=args.way)
+ place = napi.OsmID('W', args.way, args.object_class)
elif args.relation:
- params = dict(osmtype='R', osmid=args.relation)
+ place = napi.OsmID('R', args.relation, args.object_class)
else:
- params = dict(place_id=args.place_id)
- if args.object_class:
- params['class'] = args.object_class
- for name, _ in DETAILS_SWITCHES:
- params[name] = '1' if getattr(args, name) else '0'
+ assert args.place_id is not None
+ place = napi.PlaceID(args.place_id)
+
+ api = napi.NominatimAPI(args.project_dir)
+
+ details = napi.LookupDetails(address_details=args.addressdetails,
+ linked_places=args.linkedplaces,
+ parented_places=args.hierarchy,
+ keywords=args.keywords)
+ if args.polygon_geojson:
+ details.geometry_output = napi.GeometryFormat.GEOJSON
+
if args.lang:
- params['accept-language'] = args.lang
+ locales = napi.Locales.from_accept_languages(args.lang)
+ elif api.config.DEFAULT_LANGUAGE:
+ locales = napi.Locales.from_accept_languages(api.config.DEFAULT_LANGUAGE)
+ else:
+ locales = napi.Locales()
+
+ result = api.lookup(place, details)
+
+ if result:
+ output = api_output.format_result(
+ result,
+ 'details-json',
+ {'locales': locales,
+ 'group_hierarchy': args.group_hierarchy})
+ # reformat the result, so it is pretty-printed
+ json.dump(json.loads(output), sys.stdout, indent=4)
+ sys.stdout.write('\n')
+
+ return 0
- return _run_api('details', args, params)
+ LOG.error("Object not found in database.")
+ return 42
class APIStatus:
"""
def add_args(self, parser: argparse.ArgumentParser) -> None:
- formats = formatting.create(StatusResult).list_formats()
+ formats = api_output.list_formats(napi.StatusResult)
group = parser.add_argument_group('API parameters')
group.add_argument('--format', default=formats[0], choices=formats,
help='Format of result')
def run(self, args: NominatimArgs) -> int:
- status = NominatimAPI(args.project_dir).status()
- print(formatting.create(StatusResult).format(status, args.format))
+ status = napi.NominatimAPI(args.project_dir).status()
+ print(api_output.format_result(status, args.format, {}))
return 0
# Arguments to 'details'
object_class: Optional[str]
+ linkedplaces: bool
+ hierarchy: bool
+ keywords: bool
+ polygon_geojson: bool
+ group_hierarchy: bool
def osm2pgsql_options(self, default_cache: int,
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+SQLAlchemy definitions for all tables used by the frontend.
+"""
+from typing import Any
+
+import sqlalchemy as sa
+from geoalchemy2 import Geometry
+from sqlalchemy.dialects.postgresql import HSTORE, ARRAY, JSONB
+from sqlalchemy.dialects.sqlite import JSON as sqlite_json
+
+class PostgresTypes:
+ """ Type definitions for complex types as used in Postgres variants.
+ """
+ Composite = HSTORE
+ Json = JSONB
+ IntArray = ARRAY(sa.Integer()) #pylint: disable=invalid-name
+
+
+class SqliteTypes:
+ """ Type definitions for complex types as used in Postgres variants.
+ """
+ Composite = sqlite_json
+ Json = sqlite_json
+ IntArray = sqlite_json
+
+
+#pylint: disable=too-many-instance-attributes
+class SearchTables:
+ """ Data class that holds the tables of the Nominatim database.
+ """
+
+ def __init__(self, meta: sa.MetaData, engine_name: str) -> None:
+ if engine_name == 'postgresql':
+ self.types: Any = PostgresTypes
+ elif engine_name == 'sqlite':
+ self.types = SqliteTypes
+ else:
+ raise ValueError("Only 'postgresql' and 'sqlite' engines are supported.")
+
+ self.meta = meta
+
+ self.import_status = sa.Table('import_status', meta,
+ sa.Column('lastimportdate', sa.DateTime(True), nullable=False),
+ sa.Column('sequence_id', sa.Integer),
+ sa.Column('indexed', sa.Boolean))
+
+ self.properties = sa.Table('nominatim_properties', meta,
+ sa.Column('property', sa.Text, nullable=False),
+ sa.Column('value', sa.Text))
+
+ self.placex = sa.Table('placex', meta,
+ sa.Column('place_id', sa.BigInteger, nullable=False, unique=True),
+ sa.Column('parent_place_id', sa.BigInteger),
+ sa.Column('linked_place_id', sa.BigInteger),
+ sa.Column('importance', sa.Float),
+ sa.Column('indexed_date', sa.DateTime),
+ sa.Column('rank_address', sa.SmallInteger),
+ sa.Column('rank_search', sa.SmallInteger),
+ sa.Column('partition', sa.SmallInteger),
+ sa.Column('indexed_status', sa.SmallInteger),
+ sa.Column('osm_type', sa.String(1), nullable=False),
+ sa.Column('osm_id', sa.BigInteger, nullable=False),
+ sa.Column('class', sa.Text, nullable=False, key='class_'),
+ sa.Column('type', sa.Text, nullable=False),
+ sa.Column('admin_level', sa.SmallInteger),
+ sa.Column('name', self.types.Composite),
+ sa.Column('address', self.types.Composite),
+ sa.Column('extratags', self.types.Composite),
+ sa.Column('geometry', Geometry(srid=4326), nullable=False),
+ sa.Column('wikipedia', sa.Text),
+ sa.Column('country_code', sa.String(2)),
+ sa.Column('housenumber', sa.Text),
+ sa.Column('postcode', sa.Text),
+ sa.Column('centroid', Geometry(srid=4326, spatial_index=False)))
+
+ self.addressline = sa.Table('place_addressline', meta,
+ sa.Column('place_id', sa.BigInteger, index=True),
+ sa.Column('address_place_id', sa.BigInteger, index=True),
+ sa.Column('distance', sa.Float),
+ sa.Column('cached_rank_address', sa.SmallInteger),
+ sa.Column('fromarea', sa.Boolean),
+ sa.Column('isaddress', sa.Boolean))
+
+ self.postcode = sa.Table('location_postcode', meta,
+ sa.Column('place_id', sa.BigInteger, unique=True),
+ sa.Column('parent_place_id', sa.BigInteger),
+ sa.Column('rank_search', sa.SmallInteger),
+ sa.Column('rank_address', sa.SmallInteger),
+ sa.Column('indexed_status', sa.SmallInteger),
+ sa.Column('indexed_date', sa.DateTime),
+ sa.Column('country_code', sa.String(2)),
+ sa.Column('postcode', sa.Text, index=True),
+ sa.Column('geometry', Geometry(srid=4326)))
+
+ self.osmline = sa.Table('location_property_osmline', meta,
+ sa.Column('place_id', sa.BigInteger, nullable=False, unique=True),
+ sa.Column('osm_id', sa.BigInteger),
+ sa.Column('parent_place_id', sa.BigInteger),
+ sa.Column('indexed_date', sa.DateTime),
+ sa.Column('startnumber', sa.Integer),
+ sa.Column('endnumber', sa.Integer),
+ sa.Column('step', sa.SmallInteger),
+ sa.Column('partition', sa.SmallInteger),
+ sa.Column('indexed_status', sa.SmallInteger),
+ sa.Column('linegeo', Geometry(srid=4326)),
+ sa.Column('address', self.types.Composite),
+ sa.Column('postcode', sa.Text),
+ sa.Column('country_code', sa.String(2)))
+
+ self.word = sa.Table('word', meta,
+ sa.Column('word_id', sa.Integer),
+ sa.Column('word_token', sa.Text, nullable=False),
+ sa.Column('type', sa.Text, nullable=False),
+ sa.Column('word', sa.Text),
+ sa.Column('info', self.types.Json))
+
+ self.country_name = sa.Table('country_name', meta,
+ sa.Column('country_code', sa.String(2)),
+ sa.Column('name', self.types.Composite),
+ sa.Column('derived_name', self.types.Composite),
+ sa.Column('country_default_language_code', sa.Text),
+ sa.Column('partition', sa.Integer))
+
+ self.country_grid = sa.Table('country_osm_grid', meta,
+ sa.Column('country_code', sa.String(2)),
+ sa.Column('area', sa.Float),
+ sa.Column('geometry', Geometry(srid=4326)))
+
+ # The following tables are not necessarily present.
+ self.search_name = sa.Table('search_name', meta,
+ sa.Column('place_id', sa.BigInteger, index=True),
+ sa.Column('importance', sa.Float),
+ sa.Column('search_rank', sa.SmallInteger),
+ sa.Column('address_rank', sa.SmallInteger),
+ sa.Column('name_vector', self.types.IntArray, index=True),
+ sa.Column('nameaddress_vector', self.types.IntArray, index=True),
+ sa.Column('country_code', sa.String(2)),
+ sa.Column('centroid', Geometry(srid=4326)))
+
+ self.tiger = sa.Table('location_property_tiger', meta,
+ sa.Column('place_id', sa.BigInteger),
+ sa.Column('parent_place_id', sa.BigInteger),
+ sa.Column('startnumber', sa.Integer),
+ sa.Column('endnumber', sa.Integer),
+ sa.Column('step', sa.SmallInteger),
+ sa.Column('partition', sa.SmallInteger),
+ sa.Column('linegeo', Geometry(srid=4326, spatial_index=False)),
+ sa.Column('postcode', sa.Text))
+++ /dev/null
-# SPDX-License-Identifier: GPL-2.0-only
-#
-# This file is part of Nominatim. (https://nominatim.org)
-#
-# Copyright (C) 2022 by the Nominatim developer community.
-# For a full list of authors see the git log.
-"""
-Output formatters for API version v1.
-"""
-from typing import Dict, Any
-from collections import OrderedDict
-import json
-
-from nominatim.result_formatter.base import FormatDispatcher
-from nominatim.apicmd.status import StatusResult
-
-create = FormatDispatcher()
-
-@create.format_func(StatusResult, 'text')
-def _format_status_text(result: StatusResult) -> str:
- if result.status:
- return f"ERROR: {result.message}"
-
- return 'OK'
-
-
-@create.format_func(StatusResult, 'json')
-def _format_status_json(result: StatusResult) -> str:
- out: Dict[str, Any] = OrderedDict()
- out['status'] = result.status
- out['message'] = result.message
- if result.data_updated is not None:
- out['data_updated'] = result.data_updated.isoformat()
- out['software_version'] = str(result.software_version)
- if result.database_version is not None:
- out['database_version'] = str(result.database_version)
-
- return json.dumps(out)
#
# This file is part of Nominatim. (https://nominatim.org)
#
-# Copyright (C) 2022 by the Nominatim developer community.
+# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Server implementation using the falcon webserver framework.
"""
-from typing import Type, Any, Optional, Mapping
+from typing import Optional, Mapping, cast
from pathlib import Path
import falcon
-import falcon.asgi
+from falcon.asgi import App, Request, Response
from nominatim.api import NominatimAPIAsync
-from nominatim.apicmd.status import StatusResult
-import nominatim.result_formatter.v1 as formatting
+import nominatim.api.v1 as api_impl
+from nominatim.config import Configuration
-CONTENT_TYPE = {
- 'text': falcon.MEDIA_TEXT,
- 'xml': falcon.MEDIA_XML
-}
-class NominatimV1:
- """ Implementation of V1 version of the Nominatim API.
+class ParamWrapper(api_impl.ASGIAdaptor):
+ """ Adaptor class for server glue to Falcon framework.
"""
- def __init__(self, project_dir: Path, environ: Optional[Mapping[str, str]]) -> None:
- self.api = NominatimAPIAsync(project_dir, environ)
- self.formatters = {}
+ def __init__(self, req: Request, resp: Response,
+ config: Configuration) -> None:
+ self.request = req
+ self.response = resp
+ self._config = config
- for rtype in (StatusResult, ):
- self.formatters[rtype] = formatting.create(rtype)
+ def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
+ return cast(Optional[str], self.request.get_param(name, default=default))
- def parse_format(self, req: falcon.asgi.Request, rtype: Type[Any], default: str) -> None:
- """ Get and check the 'format' parameter and prepare the formatter.
- `rtype` describes the expected return type and `default` the
- format value to assume when no parameter is present.
- """
- req.context.format = req.get_param('format', default=default)
- req.context.formatter = self.formatters[rtype]
- if not req.context.formatter.supports_format(req.context.format):
- raise falcon.HTTPBadRequest(
- description="Parameter 'format' must be one of: " +
- ', '.join(req.context.formatter.list_formats()))
+ def get_header(self, name: str, default: Optional[str] = None) -> Optional[str]:
+ return cast(Optional[str], self.request.get_header(name, default=default))
- def format_response(self, req: falcon.asgi.Request, resp: falcon.asgi.Response,
- result: Any) -> None:
- """ Render response into a string according to the formatter
- set in `parse_format()`.
- """
- resp.text = req.context.formatter.format(result, req.context.format)
- resp.content_type = CONTENT_TYPE.get(req.context.format, falcon.MEDIA_JSON)
+ def error(self, msg: str, status: int = 400) -> falcon.HTTPError:
+ if status == 400:
+ return falcon.HTTPBadRequest(description=msg)
+ if status == 404:
+ return falcon.HTTPNotFound(description=msg)
+ return falcon.HTTPError(status, description=msg)
- async def on_get_status(self, req: falcon.asgi.Request, resp: falcon.asgi.Response) -> None:
- """ Implementation of status endpoint.
- """
- self.parse_format(req, StatusResult, 'text')
- result = await self.api.status()
+ def create_response(self, status: int, output: str, content_type: str) -> None:
+ self.response.status = status
+ self.response.text = output
+ self.response.content_type = content_type
+
+
+ def config(self) -> Configuration:
+ return self._config
+
- self.format_response(req, resp, result)
- if result.status and req.context.format == 'text':
- resp.status = 500
+class EndpointWrapper:
+ """ Converter for server glue endpoint functions to Falcon request handlers.
+ """
+
+ def __init__(self, func: api_impl.EndpointFunc, api: NominatimAPIAsync) -> None:
+ self.func = func
+ self.api = api
+
+
+ async def on_get(self, req: Request, resp: Response) -> None:
+ """ Implementation of the endpoint.
+ """
+ await self.func(self.api, ParamWrapper(req, resp, self.api.config))
def get_application(project_dir: Path,
- environ: Optional[Mapping[str, str]] = None) -> falcon.asgi.App:
- """ Create a Nominatim falcon ASGI application.
+ environ: Optional[Mapping[str, str]] = None) -> App:
+ """ Create a Nominatim Falcon ASGI application.
"""
- app = falcon.asgi.App()
+ api = NominatimAPIAsync(project_dir, environ)
- api = NominatimV1(project_dir, environ)
+ app = App(cors_enable=api.config.get_bool('CORS_NOACCESSCONTROL'))
- app.add_route('/status', api, suffix='status')
+ legacy_urls = api.config.get_bool('SERVE_LEGACY_URLS')
+ for name, func in api_impl.ROUTES:
+ endpoint = EndpointWrapper(func, api)
+ app.add_route(f"/{name}", endpoint)
+ if legacy_urls:
+ app.add_route(f"/{name}.php", endpoint)
return app
#
# This file is part of Nominatim. (https://nominatim.org)
#
-# Copyright (C) 2022 by the Nominatim developer community.
+# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Server implementation using the sanic webserver framework.
"""
-from typing import Any, Optional, Mapping
+from typing import Any, Optional, Mapping, Callable, cast, Coroutine
from pathlib import Path
-import sanic
+from sanic import Request, HTTPResponse, Sanic
+from sanic.exceptions import SanicException
+from sanic.response import text as TextResponse
from nominatim.api import NominatimAPIAsync
-from nominatim.apicmd.status import StatusResult
-import nominatim.result_formatter.v1 as formatting
+import nominatim.api.v1 as api_impl
+from nominatim.config import Configuration
-api = sanic.Blueprint('NominatimAPI')
+class ParamWrapper(api_impl.ASGIAdaptor):
+ """ Adaptor class for server glue to Sanic framework.
+ """
-CONTENT_TYPE = {
- 'text': 'text/plain; charset=utf-8',
- 'xml': 'text/xml; charset=utf-8'
-}
+ def __init__(self, request: Request) -> None:
+ self.request = request
-def usage_error(msg: str) -> sanic.HTTPResponse:
- """ Format the response for an error with the query parameters.
- """
- return sanic.response.text(msg, status=400)
+ def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
+ return cast(Optional[str], self.request.args.get(name, default))
-def api_response(request: sanic.Request, result: Any) -> sanic.HTTPResponse:
- """ Render a response from the query results using the configured
- formatter.
- """
- body = request.ctx.formatter.format(result, request.ctx.format)
- return sanic.response.text(body,
- content_type=CONTENT_TYPE.get(request.ctx.format,
- 'application/json'))
-
-
-@api.on_request # type: ignore[misc]
-async def extract_format(request: sanic.Request) -> Optional[sanic.HTTPResponse]:
- """ Get and check the 'format' parameter and prepare the formatter.
- `ctx.result_type` describes the expected return type and
- `ctx.default_format` the format value to assume when no parameter
- is present.
- """
- assert request.route is not None
- request.ctx.formatter = request.app.ctx.formatters[request.route.ctx.result_type]
- request.ctx.format = request.args.get('format', request.route.ctx.default_format)
- if not request.ctx.formatter.supports_format(request.ctx.format):
- return usage_error("Parameter 'format' must be one of: " +
- ', '.join(request.ctx.formatter.list_formats()))
+ def get_header(self, name: str, default: Optional[str] = None) -> Optional[str]:
+ return cast(Optional[str], self.request.headers.get(name, default))
- return None
+ def error(self, msg: str, status: int = 400) -> SanicException:
+ return SanicException(msg, status_code=status)
-@api.get('/status', ctx_result_type=StatusResult, ctx_default_format='text')
-async def status(request: sanic.Request) -> sanic.HTTPResponse:
- """ Implementation of status endpoint.
- """
- result = await request.app.ctx.api.status()
- response = api_response(request, result)
- if request.ctx.format == 'text' and result.status:
- response.status = 500
+ def create_response(self, status: int, output: str,
+ content_type: str) -> HTTPResponse:
+ return TextResponse(output, status=status, content_type=content_type)
- return response
+
+ def config(self) -> Configuration:
+ return cast(Configuration, self.request.app.ctx.api.config)
+
+
+def _wrap_endpoint(func: api_impl.EndpointFunc)\
+ -> Callable[[Request], Coroutine[Any, Any, HTTPResponse]]:
+ async def _callback(request: Request) -> HTTPResponse:
+ return cast(HTTPResponse, await func(request.app.ctx.api, ParamWrapper(request)))
+
+ return _callback
def get_application(project_dir: Path,
- environ: Optional[Mapping[str, str]] = None) -> sanic.Sanic:
+ environ: Optional[Mapping[str, str]] = None) -> Sanic:
""" Create a Nominatim sanic ASGI application.
"""
- app = sanic.Sanic("NominatimInstance")
+ app = Sanic("NominatimInstance")
app.ctx.api = NominatimAPIAsync(project_dir, environ)
- app.ctx.formatters = {}
- for rtype in (StatusResult, ):
- app.ctx.formatters[rtype] = formatting.create(rtype)
- app.blueprint(api)
+ if app.ctx.api.config.get_bool('CORS_NOACCESSCONTROL'):
+ from sanic_cors import CORS # pylint: disable=import-outside-toplevel
+ CORS(app)
+
+ legacy_urls = app.ctx.api.config.get_bool('SERVE_LEGACY_URLS')
+ for name, func in api_impl.ROUTES:
+ endpoint = _wrap_endpoint(func)
+ app.add_route(endpoint, f"/{name}", name=f"v1_{name}_simple")
+ if legacy_urls:
+ app.add_route(endpoint, f"/{name}.php", name=f"v1_{name}_legacy")
return app
#
# This file is part of Nominatim. (https://nominatim.org)
#
-# Copyright (C) 2022 by the Nominatim developer community.
+# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Server implementation using the starlette webserver framework.
"""
-from typing import Any, Type, Optional, Mapping
+from typing import Any, Optional, Mapping, Callable, cast, Coroutine
from pathlib import Path
from starlette.applications import Starlette
from starlette.exceptions import HTTPException
from starlette.responses import Response
from starlette.requests import Request
+from starlette.middleware import Middleware
+from starlette.middleware.cors import CORSMiddleware
from nominatim.api import NominatimAPIAsync
-from nominatim.apicmd.status import StatusResult
-import nominatim.result_formatter.v1 as formatting
+import nominatim.api.v1 as api_impl
+from nominatim.config import Configuration
-CONTENT_TYPE = {
- 'text': 'text/plain; charset=utf-8',
- 'xml': 'text/xml; charset=utf-8'
-}
+class ParamWrapper(api_impl.ASGIAdaptor):
+ """ Adaptor class for server glue to Starlette framework.
+ """
-FORMATTERS = {
- StatusResult: formatting.create(StatusResult)
-}
+ def __init__(self, request: Request) -> None:
+ self.request = request
-def parse_format(request: Request, rtype: Type[Any], default: str) -> None:
- """ Get and check the 'format' parameter and prepare the formatter.
- `rtype` describes the expected return type and `default` the
- format value to assume when no parameter is present.
- """
- fmt = request.query_params.get('format', default=default)
- fmtter = FORMATTERS[rtype]
+ def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
+ return self.request.query_params.get(name, default=default)
- if not fmtter.supports_format(fmt):
- raise HTTPException(400, detail="Parameter 'format' must be one of: " +
- ', '.join(fmtter.list_formats()))
- request.state.format = fmt
- request.state.formatter = fmtter
+ def get_header(self, name: str, default: Optional[str] = None) -> Optional[str]:
+ return self.request.headers.get(name, default)
-def format_response(request: Request, result: Any) -> Response:
- """ Render response into a string according to the formatter
- set in `parse_format()`.
- """
- fmt = request.state.format
- return Response(request.state.formatter.format(result, fmt),
- media_type=CONTENT_TYPE.get(fmt, 'application/json'))
+ def error(self, msg: str, status: int = 400) -> HTTPException:
+ return HTTPException(status, detail=msg)
-async def on_status(request: Request) -> Response:
- """ Implementation of status endpoint.
- """
- parse_format(request, StatusResult, 'text')
- result = await request.app.state.API.status()
- response = format_response(request, result)
+ def create_response(self, status: int, output: str, content_type: str) -> Response:
+ return Response(output, status_code=status, media_type=content_type)
+
- if request.state.format == 'text' and result.status:
- response.status_code = 500
+ def config(self) -> Configuration:
+ return cast(Configuration, self.request.app.state.API.config)
- return response
+def _wrap_endpoint(func: api_impl.EndpointFunc)\
+ -> Callable[[Request], Coroutine[Any, Any, Response]]:
+ async def _callback(request: Request) -> Response:
+ return cast(Response, await func(request.app.state.API, ParamWrapper(request)))
+
+ return _callback
-V1_ROUTES = [
- Route('/status', endpoint=on_status)
-]
def get_application(project_dir: Path,
environ: Optional[Mapping[str, str]] = None) -> Starlette:
""" Create a Nominatim falcon ASGI application.
"""
- app = Starlette(debug=True, routes=V1_ROUTES)
+ config = Configuration(project_dir, environ)
+
+ routes = []
+ legacy_urls = config.get_bool('SERVE_LEGACY_URLS')
+ for name, func in api_impl.ROUTES:
+ endpoint = _wrap_endpoint(func)
+ routes.append(Route(f"/{name}", endpoint=endpoint))
+ if legacy_urls:
+ routes.append(Route(f"/{name}.php", endpoint=endpoint))
+
+ middleware = []
+ if config.get_bool('CORS_NOACCESSCONTROL'):
+ middleware.append(Middleware(CORSMiddleware, allow_origins=['*']))
+
+ app = Starlette(debug=True, routes=routes, middleware=middleware)
app.state.API = NominatimAPIAsync(project_dir, environ)
there.
"""
def decorator(func: Callable[..., None]) -> Callable[..., None]:
- version = (NominatimVersion(major, minor, patch, dbpatch))
+ version = NominatimVersion(major, minor, patch, dbpatch)
_MIGRATION_FUNCTIONS.append((version, func))
return func
cur.execute("""
UPDATE placex SET (wikipedia, importance) =
(SELECT wikipedia, importance
- FROM compute_importance(extratags, country_code, osm_type, osm_id, centroid))
+ FROM compute_importance(extratags, country_code, rank_search, centroid))
""")
cur.execute("""
UPDATE placex s SET wikipedia = d.wikipedia, importance = d.importance
#
# This file is part of Nominatim. (https://nominatim.org)
#
-# Copyright (C) 2022 by the Nominatim developer community.
+# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Type definitions for typing annotations.
Protocol = object
Final = 'Final'
TypedDict = dict
+
+
+# SQLAlchemy introduced generic types in version 2.0 making typing
+# inclompatiple with older versions. Add wrappers here so we don't have
+# to litter the code with bare-string types.
+
+if TYPE_CHECKING:
+ import sqlalchemy as sa
+ from typing_extensions import (TypeAlias as TypeAlias)
+else:
+ TypeAlias = str
+
+SaSelect: TypeAlias = 'sa.Select[Any]'
+SaRow: TypeAlias = 'sa.Row[Any]'
+SaColumn: TypeAlias = 'sa.Column[Any]'
+SaLabel: TypeAlias = 'sa.Label[Any]'
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Streaming JSON encoder.
+"""
+from typing import Any, TypeVar, Optional, Callable
+import io
+try:
+ import ujson as json
+except ModuleNotFoundError:
+ import json # type: ignore[no-redef]
+
+T = TypeVar('T') # pylint: disable=invalid-name
+
+class JsonWriter:
+ """ JSON encoder that renders the output directly into an output
+ stream. This is a very simple writer which produces JSON in a
+ compact as possible form.
+
+ The writer does not check for syntactic correctness. It is the
+ responsibility of the caller to call the write functions in an
+ order that produces correct JSON.
+
+ All functions return the writer object itself so that function
+ calls can be chained.
+ """
+
+ def __init__(self) -> None:
+ self.data = io.StringIO()
+ self.pending = ''
+
+
+ def __call__(self) -> str:
+ """ Return the rendered JSON content as a string.
+ The writer remains usable after calling this function.
+ """
+ if self.pending:
+ assert self.pending in (']', '}')
+ self.data.write(self.pending)
+ self.pending = ''
+ return self.data.getvalue()
+
+
+ def start_object(self) -> 'JsonWriter':
+ """ Write the open bracket of a JSON object.
+ """
+ if self.pending:
+ self.data.write(self.pending)
+ self.pending = '{'
+ return self
+
+
+ def end_object(self) -> 'JsonWriter':
+ """ Write the closing bracket of a JSON object.
+ """
+ assert self.pending in (',', '{', '')
+ if self.pending == '{':
+ self.data.write(self.pending)
+ self.pending = '}'
+ return self
+
+
+ def start_array(self) -> 'JsonWriter':
+ """ Write the opening bracket of a JSON array.
+ """
+ if self.pending:
+ self.data.write(self.pending)
+ self.pending = '['
+ return self
+
+
+ def end_array(self) -> 'JsonWriter':
+ """ Write the closing bracket of a JSON array.
+ """
+ assert self.pending in (',', '[', '')
+ if self.pending == '[':
+ self.data.write(self.pending)
+ self.pending = ']'
+ return self
+
+
+ def key(self, name: str) -> 'JsonWriter':
+ """ Write the key string of a JSON object.
+ """
+ assert self.pending
+ self.data.write(self.pending)
+ self.data.write(json.dumps(name, ensure_ascii=False))
+ self.pending = ':'
+ return self
+
+
+ def value(self, value: Any) -> 'JsonWriter':
+ """ Write out a value as JSON. The function uses the json.dumps()
+ function for encoding the JSON. Thus any value that can be
+ encoded by that function is permissible here.
+ """
+ return self.raw(json.dumps(value, ensure_ascii=False))
+
+
+ def next(self) -> 'JsonWriter':
+ """ Write out a delimiter comma between JSON object or array elements.
+ """
+ if self.pending:
+ self.data.write(self.pending)
+ self.pending = ','
+ return self
+
+
+ def raw(self, raw_json: str) -> 'JsonWriter':
+ """ Write out the given value as is. This function is useful if
+ a value is already available in JSON format.
+ """
+ if self.pending:
+ self.data.write(self.pending)
+ self.pending = ''
+ self.data.write(raw_json)
+ return self
+
+
+ def keyval(self, key: str, value: Any) -> 'JsonWriter':
+ """ Write out an object element with the given key and value.
+ This is a shortcut for calling 'key()', 'value()' and 'next()'.
+ """
+ self.key(key)
+ self.value(value)
+ return self.next()
+
+
+ def keyval_not_none(self, key: str, value: Optional[T],
+ transform: Optional[Callable[[T], Any]] = None) -> 'JsonWriter':
+ """ Write out an object element only if the value is not None.
+ If 'transform' is given, it must be a function that takes the
+ value type and returns a JSON encodable type. The transform
+ function will be called before the value is written out.
+ """
+ if value is not None:
+ self.key(key)
+ self.value(transform(value) if transform else value)
+ self.next()
+ return self
# Set to zero to disable polygon output.
NOMINATIM_POLYGON_OUTPUT_MAX_TYPES=1
+# Offer backwards compatible PHP URLs.
+# When running one of the Python enignes, they will add endpoint aliases
+# under <endpoint>.php
+NOMINATIM_SERVE_LEGACY_URLS=yes
+
### Log settings
#
# The following options allow to enable logging of API requests.
default-pattern: "[A-Z0-9- ]{3,12}"
- step: clean-tiger-tags
- step: split-name-list
+ delimiters: ;
- step: strip-brace-terms
- step: tag-analyzer-by-language
filter-kind: [".*name.*"]
Then the result is valid json
And result has attributes geometry
And result has not attributes keywords,address,linked_places,parentof
+ And results contain
+ | geometry+type |
+ | Point |
Scenario: JSON Details with pretty printing
When sending json details query for W297699560
| keywords |
| 1 |
Then the result is valid json
+ And result has attributes keywords
Scenario Outline: JSON details with full geometry
When sending json details query for <osmid>
| 1 |
Then the result is valid json
And result has attributes geometry
+ And results contain
+ | geometry+type |
+ | <geometry> |
Examples:
- | osmid |
- | W297699560 |
- | W243055645 |
- | W243055716 |
- | W43327921 |
+ | osmid | geometry |
+ | W297699560 | LineString |
+ | W243055645 | Polygon |
+ | W243055716 | Polygon |
+ | W43327921 | LineString |
| place_id |
| 107077 |
+
Scenario Outline: Details via OSM id
When sending details query for <type><id>
Then the result is valid json
| W | 43327921 |
| R | 123924 |
- Scenario: Details for interpolation way just return the dependent street
- When sending details query for W1
- Then the result is valid json
- And results contain
- | category |
- | highway |
Scenario Outline: Details for different class types for the same OSM id
When sending details query for N300209696:<class>
| natural |
| mountain_pass |
+
Scenario Outline: Details via unknown OSM id
When sending details query for <object>
Then a HTTP 404 is returned
| N300209696:highway |
+ @v1-api-php-only
+ Scenario: Details for interpolation way just return the dependent street
+ When sending details query for W1
+ Then the result is valid json
+ And results contain
+ | category |
+ | highway |
+
+
+ @v1-api-python-only
+ Scenario: Details for interpolation way return the interpolation
+ When sending details query for W1
+ Then the result is valid json
+ And results contain
+ | category | type | osm_type | osm_id | admin_level |
+ | place | houses | W | 1 | 15 |
+
+
+ @v1-api-php-only
+ Scenario: Details for Tiger way just return the dependent street
+ When sending details query for 112871
+ Then the result is valid json
+ And results contain
+ | category |
+ | highway |
+
+
+ @v1-api-python-only
+ Scenario: Details for interpolation way return the interpolation
+ When sending details query for 112871
+ Then the result is valid json
+ And results contain
+ | category | type | admin_level |
+ | place | houses | 15 |
+ And result has not attributes osm_type,osm_id
+
+
+ @v1-api-php-only
+ Scenario: Details for postcodes just return the dependent place
+ When sending details query for 112820
+ Then the result is valid json
+ And results contain
+ | category |
+ | boundary |
+
+
+ @v1-api-python-only
+ Scenario: Details for interpolation way return the interpolation
+ When sending details query for 112820
+ Then the result is valid json
+ And results contain
+ | category | type | admin_level |
+ | place | postcode | 15 |
+ And result has not attributes osm_type,osm_id
Feature: Creation of search terms
Tests that search_name table is filled correctly
- Scenario Outline: Comma- and semicolon separated names appear as full names
+ Scenario: Semicolon-separated names appear as separate full names
Given the places
| osm | class | type | name+alt_name |
- | N1 | place | city | New York<sep>Big Apple |
+ | N1 | place | city | New York; Big Apple |
When importing
Then search_name contains
| object | name_vector |
| N1 | #New York, #Big Apple |
- Examples:
- | sep |
- | , |
- | ; |
+ @fail-legacy
+ Scenario: Comma-separated names appear as a single full name
+ Given the places
+ | osm | class | type | name+alt_name |
+ | N1 | place | city | New York, Big Apple |
+ When importing
+ Then search_name contains
+ | object | name_vector |
+ | N1 | #New York Big Apple |
- Scenario Outline: Name parts before brackets appear as full names
+ Scenario: Name parts before brackets appear as full names
Given the places
| osm | class | type | name+name |
| N1 | place | city | Halle (Saale) |
if tag == 'fail-legacy':
if context.config.userdata['TOKENIZER'] == 'legacy':
context.scenario.skip("Not implemented in legacy tokenizer")
+ if tag == 'v1-api-php-only':
+ if context.config.userdata['API_ENGINE'] != 'php':
+ context.scenario.skip("Only valid with PHP version of v1 API.")
+ if tag == 'v1-api-python-only':
+ if context.config.userdata['API_ENGINE'] == 'php':
+ context.scenario.skip("Only valid with Python version of v1 API.")
#
# This file is part of Nominatim. (https://nominatim.org)
#
-# Copyright (C) 2022 by the Nominatim developer community.
+# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Classes wrapping HTTP responses from the Nominatim API.
assert str(self.result[idx][field]) == str(value), \
BadRowValueAssert(self, idx, field, value)
+
+ def assert_subfield(self, idx, path, value):
+ assert path
+
+ field = self.result[idx]
+ for p in path:
+ assert isinstance(field, OrderedDict)
+ assert p in field
+ field = field[p]
+
+ if isinstance(value, float):
+ assert Almost(value) == float(field)
+ elif value.startswith("^"):
+ assert re.fullmatch(value, field)
+ elif isinstance(field, OrderedDict):
+ assert field, eval('{' + value + '}')
+ else:
+ assert str(field) == str(value)
+
+
def assert_address_field(self, idx, field, value):
""" Check that result rows`idx` has a field `field` with value `value`
in its address. If idx is None, then all results are checked.
raise RuntimeError("Context needed when using grid coordinates")
self.assert_field(i, 'lat', float(lat))
self.assert_field(i, 'lon', float(lon))
+ elif '+' in name:
+ self.assert_subfield(i, name.split('+'), value)
else:
self.assert_field(i, name, value)
from asgi_lifespan import LifespanManager
import httpx
- async def _request(endpoint, params, project_dir, environ):
+ async def _request(endpoint, params, project_dir, environ, http_headers):
app = nominatim.server.starlette.server.get_application(project_dir, environ)
async with LifespanManager(app):
async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
- response = await client.get(f"/{endpoint}", params=params)
+ response = await client.get(f"/{endpoint}", params=params,
+ headers=http_headers)
return response.text, response.status_code
def create_api_request_func_sanic(self):
import nominatim.server.sanic.server
- async def _request(endpoint, params, project_dir, environ):
+ async def _request(endpoint, params, project_dir, environ, http_headers):
app = nominatim.server.sanic.server.get_application(project_dir, environ)
- _, response = await app.asgi_client.get(f"/{endpoint}", params=params)
+ _, response = await app.asgi_client.get(f"/{endpoint}", params=params,
+ headers=http_headers)
return response.text, response.status_code
import nominatim.server.falcon.server
import falcon.testing
- async def _request(endpoint, params, project_dir, environ):
+ async def _request(endpoint, params, project_dir, environ, http_headers):
app = nominatim.server.falcon.server.get_application(project_dir, environ)
async with falcon.testing.ASGIConductor(app) as conductor:
- response = await conductor.get(f"/{endpoint}", params=params)
+ response = await conductor.get(f"/{endpoint}", params=params,
+ headers=http_headers)
return response.text, response.status_code
return asyncio.run(context.nominatim.api_engine(endpoint, params,
Path(context.nominatim.website_dir.name),
- context.nominatim.test_env))
+ context.nominatim.test_env,
+ getattr(context, 'http_headers', {})))
#
# This file is part of Nominatim. (https://nominatim.org)
#
-# Copyright (C) 2022 by the Nominatim developer community.
+# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Helper fixtures for API call tests.
from pathlib import Path
import pytest
import time
+import datetime as dt
+
+import nominatim.api as napi
+from nominatim.db.sql_preprocessor import SQLPreprocessor
+
+class APITester:
+
+ def __init__(self):
+ self.api = napi.NominatimAPI(Path('/invalid'))
+ self.async_to_sync(self.api._async_api.setup_database())
+
+
+ def async_to_sync(self, func):
+ """ Run an asynchronous function until completion using the
+ internal loop of the API.
+ """
+ return self.api._loop.run_until_complete(func)
+
+
+ def add_data(self, table, data):
+ """ Insert data into the given table.
+ """
+ sql = getattr(self.api._async_api._tables, table).insert()
+ self.async_to_sync(self.exec_async(sql, data))
+
+
+ def add_placex(self, **kw):
+ name = kw.get('name')
+ if isinstance(name, str):
+ name = {'name': name}
+
+ self.add_data('placex',
+ {'place_id': kw.get('place_id', 1000),
+ 'osm_type': kw.get('osm_type', 'W'),
+ 'osm_id': kw.get('osm_id', 4),
+ 'class_': kw.get('class_', 'highway'),
+ 'type': kw.get('type', 'residential'),
+ 'name': name,
+ 'address': kw.get('address'),
+ 'extratags': kw.get('extratags'),
+ 'parent_place_id': kw.get('parent_place_id'),
+ 'linked_place_id': kw.get('linked_place_id'),
+ 'admin_level': kw.get('admin_level', 15),
+ 'country_code': kw.get('country_code'),
+ 'housenumber': kw.get('housenumber'),
+ 'postcode': kw.get('postcode'),
+ 'wikipedia': kw.get('wikipedia'),
+ 'rank_search': kw.get('rank_search', 30),
+ 'rank_address': kw.get('rank_address', 30),
+ 'importance': kw.get('importance'),
+ 'centroid': 'SRID=4326;POINT(%f %f)' % kw.get('centroid', (23.0, 34.0)),
+ 'indexed_date': kw.get('indexed_date',
+ dt.datetime(2022, 12, 7, 14, 14, 46, 0)),
+ 'geometry': 'SRID=4326;' + kw.get('geometry', 'POINT(23 34)')})
+
+
+ def add_address_placex(self, object_id, **kw):
+ self.add_placex(**kw)
+ self.add_data('addressline',
+ {'place_id': object_id,
+ 'address_place_id': kw.get('place_id', 1000),
+ 'distance': kw.get('distance', 0.0),
+ 'cached_rank_address': kw.get('rank_address', 30),
+ 'fromarea': kw.get('fromarea', False),
+ 'isaddress': kw.get('isaddress', True)})
+
+
+ def add_osmline(self, **kw):
+ self.add_data('osmline',
+ {'place_id': kw.get('place_id', 10000),
+ 'osm_id': kw.get('osm_id', 4004),
+ 'parent_place_id': kw.get('parent_place_id'),
+ 'indexed_date': kw.get('indexed_date',
+ dt.datetime(2022, 12, 7, 14, 14, 46, 0)),
+ 'startnumber': kw.get('startnumber', 2),
+ 'endnumber': kw.get('endnumber', 6),
+ 'step': kw.get('step', 2),
+ 'address': kw.get('address'),
+ 'postcode': kw.get('postcode'),
+ 'country_code': kw.get('country_code'),
+ 'linegeo': 'SRID=4326;' + kw.get('geometry', 'LINESTRING(1.1 -0.2, 1.09 -0.22)')})
+
+
+ def add_tiger(self, **kw):
+ self.add_data('tiger',
+ {'place_id': kw.get('place_id', 30000),
+ 'parent_place_id': kw.get('parent_place_id'),
+ 'startnumber': kw.get('startnumber', 2),
+ 'endnumber': kw.get('endnumber', 6),
+ 'step': kw.get('step', 2),
+ 'postcode': kw.get('postcode'),
+ 'linegeo': 'SRID=4326;' + kw.get('geometry', 'LINESTRING(1.1 -0.2, 1.09 -0.22)')})
+
+
+ def add_postcode(self, **kw):
+ self.add_data('postcode',
+ {'place_id': kw.get('place_id', 1000),
+ 'parent_place_id': kw.get('parent_place_id'),
+ 'country_code': kw.get('country_code'),
+ 'postcode': kw.get('postcode'),
+ 'rank_search': kw.get('rank_search', 20),
+ 'rank_address': kw.get('rank_address', 22),
+ 'indexed_date': kw.get('indexed_date',
+ dt.datetime(2022, 12, 7, 14, 14, 46, 0)),
+ 'geometry': 'SRID=4326;' + kw.get('geometry', 'POINT(23 34)')})
+
+
+ async def exec_async(self, sql, *args, **kwargs):
+ async with self.api._async_api.begin() as conn:
+ return await conn.execute(sql, *args, **kwargs)
+
+
+ async def create_tables(self):
+ async with self.api._async_api._engine.begin() as conn:
+ await conn.run_sync(self.api._async_api._tables.meta.create_all)
-from nominatim.api import NominatimAPI
@pytest.fixture
-def apiobj(temp_db):
+def apiobj(temp_db_with_extensions, temp_db_conn, monkeypatch):
""" Create an asynchronous SQLAlchemy engine for the test DB.
"""
- api = NominatimAPI(Path('/invalid'), {})
- yield api
- api.close()
+ monkeypatch.setenv('NOMINATIM_USE_US_TIGER_DATA', 'yes')
+ testapi = APITester()
+ testapi.async_to_sync(testapi.create_tables())
+
+ SQLPreprocessor(temp_db_conn, testapi.api.config)\
+ .run_sql_file(temp_db_conn, 'functions/address_lookup.sql')
+
+ yield testapi
+
+ testapi.api.close()
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Tests for enhanced connection class for API functions.
+"""
+from pathlib import Path
+import pytest
+import pytest_asyncio
+
+import sqlalchemy as sa
+
+from nominatim.api import NominatimAPIAsync
+
+@pytest_asyncio.fixture
+async def apiobj(temp_db):
+ """ Create an asynchronous SQLAlchemy engine for the test DB.
+ """
+ api = NominatimAPIAsync(Path('/invalid'), {})
+ yield api
+ await api.close()
+
+
+@pytest.mark.asyncio
+async def test_run_scalar(apiobj, table_factory):
+ table_factory('foo', definition='that TEXT', content=(('a', ),))
+
+ async with apiobj.begin() as conn:
+ assert await conn.scalar(sa.text('SELECT * FROM foo')) == 'a'
+
+
+@pytest.mark.asyncio
+async def test_run_execute(apiobj, table_factory):
+ table_factory('foo', definition='that TEXT', content=(('a', ),))
+
+ async with apiobj.begin() as conn:
+ result = await conn.execute(sa.text('SELECT * FROM foo'))
+ assert result.fetchone()[0] == 'a'
+
+
+@pytest.mark.asyncio
+async def test_get_property_existing_cached(apiobj, table_factory):
+ table_factory('nominatim_properties',
+ definition='property TEXT, value TEXT',
+ content=(('dbv', '96723'), ))
+
+ async with apiobj.begin() as conn:
+ assert await conn.get_property('dbv') == '96723'
+
+ await conn.execute(sa.text('TRUNCATE nominatim_properties'))
+
+ assert await conn.get_property('dbv') == '96723'
+
+
+@pytest.mark.asyncio
+async def test_get_property_existing_uncached(apiobj, table_factory):
+ table_factory('nominatim_properties',
+ definition='property TEXT, value TEXT',
+ content=(('dbv', '96723'), ))
+
+ async with apiobj.begin() as conn:
+ assert await conn.get_property('dbv') == '96723'
+
+ await conn.execute(sa.text("UPDATE nominatim_properties SET value = '1'"))
+
+ assert await conn.get_property('dbv', cached=False) == '1'
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize('param', ['foo', 'DB:server_version'])
+async def test_get_property_missing(apiobj, table_factory, param):
+ table_factory('nominatim_properties',
+ definition='property TEXT, value TEXT')
+
+ async with apiobj.begin() as conn:
+ with pytest.raises(ValueError):
+ await conn.get_property(param)
+
+
+@pytest.mark.asyncio
+async def test_get_db_property_existing(apiobj):
+ async with apiobj.begin() as conn:
+ assert await conn.get_db_property('server_version') > 0
+
+
+@pytest.mark.asyncio
+async def test_get_db_property_existing(apiobj):
+ async with apiobj.begin() as conn:
+ with pytest.raises(ValueError):
+ await conn.get_db_property('dfkgjd.rijg')
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Tests for lookup API call.
+"""
+import datetime as dt
+
+import pytest
+
+import nominatim.api as napi
+
+@pytest.mark.parametrize('idobj', (napi.PlaceID(332), napi.OsmID('W', 4),
+ napi.OsmID('W', 4, 'highway')))
+def test_lookup_in_placex(apiobj, idobj):
+ import_date = dt.datetime(2022, 12, 7, 14, 14, 46, 0)
+ apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
+ class_='highway', type='residential',
+ name={'name': 'Road'}, address={'city': 'Barrow'},
+ extratags={'surface': 'paved'},
+ parent_place_id=34, linked_place_id=55,
+ admin_level=15, country_code='gb',
+ housenumber='4',
+ postcode='34425', wikipedia='en:Faa',
+ rank_search=27, rank_address=26,
+ importance=0.01,
+ centroid=(23, 34),
+ indexed_date=import_date,
+ geometry='LINESTRING(23 34, 23.1 34, 23.1 34.1, 23 34)')
+
+ result = apiobj.api.lookup(idobj, napi.LookupDetails())
+
+ assert result is not None
+
+ assert result.source_table.name == 'PLACEX'
+ assert result.category == ('highway', 'residential')
+ assert result.centroid == (pytest.approx(23.0), pytest.approx(34.0))
+
+ assert result.place_id == 332
+ assert result.parent_place_id == 34
+ assert result.linked_place_id == 55
+ assert result.osm_object == ('W', 4)
+ assert result.admin_level == 15
+
+ assert result.names == {'name': 'Road'}
+ assert result.address == {'city': 'Barrow'}
+ assert result.extratags == {'surface': 'paved'}
+
+ assert result.housenumber == '4'
+ assert result.postcode == '34425'
+ assert result.wikipedia == 'en:Faa'
+
+ assert result.rank_search == 27
+ assert result.rank_address == 26
+ assert result.importance == pytest.approx(0.01)
+
+ assert result.country_code == 'gb'
+ assert result.indexed_date == import_date.replace(tzinfo=dt.timezone.utc)
+
+ assert result.address_rows is None
+ assert result.linked_rows is None
+ assert result.parented_rows is None
+ assert result.name_keywords is None
+ assert result.address_keywords is None
+
+ assert result.geometry == {'type': 'ST_LineString'}
+
+
+def test_lookup_in_placex_minimal_info(apiobj):
+ import_date = dt.datetime(2022, 12, 7, 14, 14, 46, 0)
+ apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
+ class_='highway', type='residential',
+ admin_level=15,
+ rank_search=27, rank_address=26,
+ centroid=(23, 34),
+ indexed_date=import_date,
+ geometry='LINESTRING(23 34, 23.1 34, 23.1 34.1, 23 34)')
+
+ result = apiobj.api.lookup(napi.PlaceID(332), napi.LookupDetails())
+
+ assert result is not None
+
+ assert result.source_table.name == 'PLACEX'
+ assert result.category == ('highway', 'residential')
+ assert result.centroid == (pytest.approx(23.0), pytest.approx(34.0))
+
+ assert result.place_id == 332
+ assert result.parent_place_id is None
+ assert result.linked_place_id is None
+ assert result.osm_object == ('W', 4)
+ assert result.admin_level == 15
+
+ assert result.names is None
+ assert result.address is None
+ assert result.extratags is None
+
+ assert result.housenumber is None
+ assert result.postcode is None
+ assert result.wikipedia is None
+
+ assert result.rank_search == 27
+ assert result.rank_address == 26
+ assert result.importance is None
+
+ assert result.country_code is None
+ assert result.indexed_date == import_date.replace(tzinfo=dt.timezone.utc)
+
+ assert result.address_rows is None
+ assert result.linked_rows is None
+ assert result.parented_rows is None
+ assert result.name_keywords is None
+ assert result.address_keywords is None
+
+ assert result.geometry == {'type': 'ST_LineString'}
+
+
+def test_lookup_in_placex_with_geometry(apiobj):
+ apiobj.add_placex(place_id=332,
+ geometry='LINESTRING(23 34, 23.1 34)')
+
+ result = apiobj.api.lookup(napi.PlaceID(332),
+ napi.LookupDetails(geometry_output=napi.GeometryFormat.GEOJSON))
+
+ assert result.geometry == {'geojson': '{"type":"LineString","coordinates":[[23,34],[23.1,34]]}'}
+
+
+def test_lookup_placex_with_address_details(apiobj):
+ apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
+ class_='highway', type='residential', name='Street',
+ country_code='pl',
+ rank_search=27, rank_address=26)
+ apiobj.add_address_placex(332, fromarea=False, isaddress=False,
+ distance=0.0034,
+ place_id=1000, osm_type='N', osm_id=3333,
+ class_='place', type='suburb', name='Smallplace',
+ country_code='pl', admin_level=13,
+ rank_search=24, rank_address=23)
+ apiobj.add_address_placex(332, fromarea=True, isaddress=True,
+ place_id=1001, osm_type='N', osm_id=3334,
+ class_='place', type='city', name='Bigplace',
+ country_code='pl',
+ rank_search=17, rank_address=16)
+
+ result = apiobj.api.lookup(napi.PlaceID(332),
+ napi.LookupDetails(address_details=True))
+
+ assert result.address_rows == [
+ napi.AddressLine(place_id=332, osm_object=('W', 4),
+ category=('highway', 'residential'),
+ names={'name': 'Street'}, extratags={},
+ admin_level=15, fromarea=True, isaddress=True,
+ rank_address=26, distance=0.0),
+ napi.AddressLine(place_id=1000, osm_object=('N', 3333),
+ category=('place', 'suburb'),
+ names={'name': 'Smallplace'}, extratags={},
+ admin_level=13, fromarea=False, isaddress=True,
+ rank_address=23, distance=0.0034),
+ napi.AddressLine(place_id=1001, osm_object=('N', 3334),
+ category=('place', 'city'),
+ names={'name': 'Bigplace'}, extratags={},
+ admin_level=15, fromarea=True, isaddress=True,
+ rank_address=16, distance=0.0),
+ napi.AddressLine(place_id=None, osm_object=None,
+ category=('place', 'country_code'),
+ names={'ref': 'pl'}, extratags={},
+ admin_level=None, fromarea=True, isaddress=False,
+ rank_address=4, distance=0.0)
+ ]
+
+
+def test_lookup_place_with_linked_places_none_existing(apiobj):
+ apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
+ class_='highway', type='residential', name='Street',
+ country_code='pl', linked_place_id=45,
+ rank_search=27, rank_address=26)
+
+ result = apiobj.api.lookup(napi.PlaceID(332),
+ napi.LookupDetails(linked_places=True))
+
+ assert result.linked_rows == []
+
+
+def test_lookup_place_with_linked_places_existing(apiobj):
+ apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
+ class_='highway', type='residential', name='Street',
+ country_code='pl', linked_place_id=45,
+ rank_search=27, rank_address=26)
+ apiobj.add_placex(place_id=1001, osm_type='W', osm_id=5,
+ class_='highway', type='residential', name='Street',
+ country_code='pl', linked_place_id=332,
+ rank_search=27, rank_address=26)
+ apiobj.add_placex(place_id=1002, osm_type='W', osm_id=6,
+ class_='highway', type='residential', name='Street',
+ country_code='pl', linked_place_id=332,
+ rank_search=27, rank_address=26)
+
+ result = apiobj.api.lookup(napi.PlaceID(332),
+ napi.LookupDetails(linked_places=True))
+
+ assert result.linked_rows == [
+ napi.AddressLine(place_id=1001, osm_object=('W', 5),
+ category=('highway', 'residential'),
+ names={'name': 'Street'}, extratags={},
+ admin_level=15, fromarea=False, isaddress=True,
+ rank_address=26, distance=0.0),
+ napi.AddressLine(place_id=1002, osm_object=('W', 6),
+ category=('highway', 'residential'),
+ names={'name': 'Street'}, extratags={},
+ admin_level=15, fromarea=False, isaddress=True,
+ rank_address=26, distance=0.0),
+ ]
+
+
+def test_lookup_place_with_parented_places_not_existing(apiobj):
+ apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
+ class_='highway', type='residential', name='Street',
+ country_code='pl', parent_place_id=45,
+ rank_search=27, rank_address=26)
+
+ result = apiobj.api.lookup(napi.PlaceID(332),
+ napi.LookupDetails(parented_places=True))
+
+ assert result.parented_rows == []
+
+
+def test_lookup_place_with_parented_places_existing(apiobj):
+ apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
+ class_='highway', type='residential', name='Street',
+ country_code='pl', parent_place_id=45,
+ rank_search=27, rank_address=26)
+ apiobj.add_placex(place_id=1001, osm_type='N', osm_id=5,
+ class_='place', type='house', housenumber='23',
+ country_code='pl', parent_place_id=332,
+ rank_search=30, rank_address=30)
+ apiobj.add_placex(place_id=1002, osm_type='W', osm_id=6,
+ class_='highway', type='residential', name='Street',
+ country_code='pl', parent_place_id=332,
+ rank_search=27, rank_address=26)
+
+ result = apiobj.api.lookup(napi.PlaceID(332),
+ napi.LookupDetails(parented_places=True))
+
+ assert result.parented_rows == [
+ napi.AddressLine(place_id=1001, osm_object=('N', 5),
+ category=('place', 'house'),
+ names={'housenumber': '23'}, extratags={},
+ admin_level=15, fromarea=False, isaddress=True,
+ rank_address=30, distance=0.0),
+ ]
+
+
+@pytest.mark.parametrize('idobj', (napi.PlaceID(4924), napi.OsmID('W', 9928)))
+def test_lookup_in_osmline(apiobj, idobj):
+ import_date = dt.datetime(2022, 12, 7, 14, 14, 46, 0)
+ apiobj.add_osmline(place_id=4924, osm_id=9928,
+ parent_place_id=12,
+ startnumber=1, endnumber=4, step=1,
+ country_code='gb', postcode='34425',
+ address={'city': 'Big'},
+ indexed_date=import_date,
+ geometry='LINESTRING(23 34, 23 35)')
+
+ result = apiobj.api.lookup(idobj, napi.LookupDetails())
+
+ assert result is not None
+
+ assert result.source_table.name == 'OSMLINE'
+ assert result.category == ('place', 'houses')
+ assert result.centroid == (pytest.approx(23.0), pytest.approx(34.5))
+
+ assert result.place_id == 4924
+ assert result.parent_place_id == 12
+ assert result.linked_place_id is None
+ assert result.osm_object == ('W', 9928)
+ assert result.admin_level == 15
+
+ assert result.names is None
+ assert result.address == {'city': 'Big'}
+ assert result.extratags == {'startnumber': '1', 'endnumber': '4', 'step': '1'}
+
+ assert result.housenumber is None
+ assert result.postcode == '34425'
+ assert result.wikipedia is None
+
+ assert result.rank_search == 30
+ assert result.rank_address == 30
+ assert result.importance is None
+
+ assert result.country_code == 'gb'
+ assert result.indexed_date == import_date.replace(tzinfo=dt.timezone.utc)
+
+ assert result.address_rows is None
+ assert result.linked_rows is None
+ assert result.parented_rows is None
+ assert result.name_keywords is None
+ assert result.address_keywords is None
+
+ assert result.geometry == {'type': 'ST_LineString'}
+
+
+def test_lookup_in_osmline_split_interpolation(apiobj):
+ apiobj.add_osmline(place_id=1000, osm_id=9,
+ startnumber=2, endnumber=4, step=1)
+ apiobj.add_osmline(place_id=1001, osm_id=9,
+ startnumber=6, endnumber=9, step=1)
+ apiobj.add_osmline(place_id=1002, osm_id=9,
+ startnumber=11, endnumber=20, step=1)
+
+ for i in range(1, 6):
+ result = apiobj.api.lookup(napi.OsmID('W', 9, str(i)), napi.LookupDetails())
+ assert result.place_id == 1000
+ for i in range(7, 11):
+ result = apiobj.api.lookup(napi.OsmID('W', 9, str(i)), napi.LookupDetails())
+ assert result.place_id == 1001
+ for i in range(12, 22):
+ result = apiobj.api.lookup(napi.OsmID('W', 9, str(i)), napi.LookupDetails())
+ assert result.place_id == 1002
+
+
+def test_lookup_osmline_with_address_details(apiobj):
+ apiobj.add_osmline(place_id=9000, osm_id=9,
+ startnumber=2, endnumber=4, step=1,
+ parent_place_id=332)
+ apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
+ class_='highway', type='residential', name='Street',
+ country_code='pl',
+ rank_search=27, rank_address=26)
+ apiobj.add_address_placex(332, fromarea=False, isaddress=False,
+ distance=0.0034,
+ place_id=1000, osm_type='N', osm_id=3333,
+ class_='place', type='suburb', name='Smallplace',
+ country_code='pl', admin_level=13,
+ rank_search=24, rank_address=23)
+ apiobj.add_address_placex(332, fromarea=True, isaddress=True,
+ place_id=1001, osm_type='N', osm_id=3334,
+ class_='place', type='city', name='Bigplace',
+ country_code='pl',
+ rank_search=17, rank_address=16)
+
+ result = apiobj.api.lookup(napi.PlaceID(9000),
+ napi.LookupDetails(address_details=True))
+
+ assert result.address_rows == [
+ napi.AddressLine(place_id=None, osm_object=None,
+ category=('place', 'house_number'),
+ names={'ref': '2'}, extratags={},
+ admin_level=None, fromarea=True, isaddress=True,
+ rank_address=28, distance=0.0),
+ napi.AddressLine(place_id=332, osm_object=('W', 4),
+ category=('highway', 'residential'),
+ names={'name': 'Street'}, extratags={},
+ admin_level=15, fromarea=True, isaddress=True,
+ rank_address=26, distance=0.0),
+ napi.AddressLine(place_id=1000, osm_object=('N', 3333),
+ category=('place', 'suburb'),
+ names={'name': 'Smallplace'}, extratags={},
+ admin_level=13, fromarea=False, isaddress=True,
+ rank_address=23, distance=0.0034),
+ napi.AddressLine(place_id=1001, osm_object=('N', 3334),
+ category=('place', 'city'),
+ names={'name': 'Bigplace'}, extratags={},
+ admin_level=15, fromarea=True, isaddress=True,
+ rank_address=16, distance=0.0),
+ napi.AddressLine(place_id=None, osm_object=None,
+ category=('place', 'country_code'),
+ names={'ref': 'pl'}, extratags={},
+ admin_level=None, fromarea=True, isaddress=False,
+ rank_address=4, distance=0.0)
+ ]
+
+
+def test_lookup_in_tiger(apiobj):
+ apiobj.add_tiger(place_id=4924,
+ parent_place_id=12,
+ startnumber=1, endnumber=4, step=1,
+ postcode='34425',
+ geometry='LINESTRING(23 34, 23 35)')
+
+ result = apiobj.api.lookup(napi.PlaceID(4924), napi.LookupDetails())
+
+ assert result is not None
+
+ assert result.source_table.name == 'TIGER'
+ assert result.category == ('place', 'houses')
+ assert result.centroid == (pytest.approx(23.0), pytest.approx(34.5))
+
+ assert result.place_id == 4924
+ assert result.parent_place_id == 12
+ assert result.linked_place_id is None
+ assert result.osm_object is None
+ assert result.admin_level == 15
+
+ assert result.names is None
+ assert result.address is None
+ assert result.extratags == {'startnumber': '1', 'endnumber': '4', 'step': '1'}
+
+ assert result.housenumber is None
+ assert result.postcode == '34425'
+ assert result.wikipedia is None
+
+ assert result.rank_search == 30
+ assert result.rank_address == 30
+ assert result.importance is None
+
+ assert result.country_code == 'us'
+ assert result.indexed_date is None
+
+ assert result.address_rows is None
+ assert result.linked_rows is None
+ assert result.parented_rows is None
+ assert result.name_keywords is None
+ assert result.address_keywords is None
+
+ assert result.geometry == {'type': 'ST_LineString'}
+
+
+def test_lookup_tiger_with_address_details(apiobj):
+ apiobj.add_tiger(place_id=9000,
+ startnumber=2, endnumber=4, step=1,
+ parent_place_id=332)
+ apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
+ class_='highway', type='residential', name='Street',
+ country_code='us',
+ rank_search=27, rank_address=26)
+ apiobj.add_address_placex(332, fromarea=False, isaddress=False,
+ distance=0.0034,
+ place_id=1000, osm_type='N', osm_id=3333,
+ class_='place', type='suburb', name='Smallplace',
+ country_code='us', admin_level=13,
+ rank_search=24, rank_address=23)
+ apiobj.add_address_placex(332, fromarea=True, isaddress=True,
+ place_id=1001, osm_type='N', osm_id=3334,
+ class_='place', type='city', name='Bigplace',
+ country_code='us',
+ rank_search=17, rank_address=16)
+
+ result = apiobj.api.lookup(napi.PlaceID(9000),
+ napi.LookupDetails(address_details=True))
+
+ assert result.address_rows == [
+ napi.AddressLine(place_id=None, osm_object=None,
+ category=('place', 'house_number'),
+ names={'ref': '2'}, extratags={},
+ admin_level=None, fromarea=True, isaddress=True,
+ rank_address=28, distance=0.0),
+ napi.AddressLine(place_id=332, osm_object=('W', 4),
+ category=('highway', 'residential'),
+ names={'name': 'Street'}, extratags={},
+ admin_level=15, fromarea=True, isaddress=True,
+ rank_address=26, distance=0.0),
+ napi.AddressLine(place_id=1000, osm_object=('N', 3333),
+ category=('place', 'suburb'),
+ names={'name': 'Smallplace'}, extratags={},
+ admin_level=13, fromarea=False, isaddress=True,
+ rank_address=23, distance=0.0034),
+ napi.AddressLine(place_id=1001, osm_object=('N', 3334),
+ category=('place', 'city'),
+ names={'name': 'Bigplace'}, extratags={},
+ admin_level=15, fromarea=True, isaddress=True,
+ rank_address=16, distance=0.0),
+ napi.AddressLine(place_id=None, osm_object=None,
+ category=('place', 'country_code'),
+ names={'ref': 'us'}, extratags={},
+ admin_level=None, fromarea=True, isaddress=False,
+ rank_address=4, distance=0.0)
+ ]
+
+
+def test_lookup_in_postcode(apiobj):
+ import_date = dt.datetime(2022, 12, 7, 14, 14, 46, 0)
+ apiobj.add_postcode(place_id=554,
+ parent_place_id=152,
+ postcode='34 425',
+ country_code='gb',
+ rank_search=20, rank_address=22,
+ indexed_date=import_date,
+ geometry='POINT(-9.45 5.6)')
+
+ result = apiobj.api.lookup(napi.PlaceID(554), napi.LookupDetails())
+
+ assert result is not None
+
+ assert result.source_table.name == 'POSTCODE'
+ assert result.category == ('place', 'postcode')
+ assert result.centroid == (pytest.approx(-9.45), pytest.approx(5.6))
+
+ assert result.place_id == 554
+ assert result.parent_place_id == 152
+ assert result.linked_place_id is None
+ assert result.osm_object is None
+ assert result.admin_level == 15
+
+ assert result.names == {'ref': '34 425'}
+ assert result.address is None
+ assert result.extratags is None
+
+ assert result.housenumber is None
+ assert result.postcode is None
+ assert result.wikipedia is None
+
+ assert result.rank_search == 20
+ assert result.rank_address == 22
+ assert result.importance is None
+
+ assert result.country_code == 'gb'
+ assert result.indexed_date == import_date.replace(tzinfo=dt.timezone.utc)
+
+ assert result.address_rows is None
+ assert result.linked_rows is None
+ assert result.parented_rows is None
+ assert result.name_keywords is None
+ assert result.address_keywords is None
+
+ assert result.geometry == {'type': 'ST_Point'}
+
+
+def test_lookup_postcode_with_address_details(apiobj):
+ apiobj.add_postcode(place_id=9000,
+ parent_place_id=332,
+ postcode='34 425',
+ country_code='gb',
+ rank_search=25, rank_address=25)
+ apiobj.add_placex(place_id=332, osm_type='N', osm_id=3333,
+ class_='place', type='suburb', name='Smallplace',
+ country_code='gb', admin_level=13,
+ rank_search=24, rank_address=23)
+ apiobj.add_address_placex(332, fromarea=True, isaddress=True,
+ place_id=1001, osm_type='N', osm_id=3334,
+ class_='place', type='city', name='Bigplace',
+ country_code='gb',
+ rank_search=17, rank_address=16)
+
+ result = apiobj.api.lookup(napi.PlaceID(9000),
+ napi.LookupDetails(address_details=True))
+
+ assert result.address_rows == [
+ napi.AddressLine(place_id=332, osm_object=('N', 3333),
+ category=('place', 'suburb'),
+ names={'name': 'Smallplace'}, extratags={},
+ admin_level=13, fromarea=True, isaddress=True,
+ rank_address=23, distance=0.0),
+ napi.AddressLine(place_id=1001, osm_object=('N', 3334),
+ category=('place', 'city'),
+ names={'name': 'Bigplace'}, extratags={},
+ admin_level=15, fromarea=True, isaddress=True,
+ rank_address=16, distance=0.0),
+ napi.AddressLine(place_id=None, osm_object=None,
+ category=('place', 'postcode'),
+ names={'ref': '34 425'}, extratags={},
+ admin_level=None, fromarea=False, isaddress=True,
+ rank_address=5, distance=0.0),
+ napi.AddressLine(place_id=None, osm_object=None,
+ category=('place', 'country_code'),
+ names={'ref': 'gb'}, extratags={},
+ admin_level=None, fromarea=True, isaddress=False,
+ rank_address=4, distance=0.0)
+ ]
+
+@pytest.mark.parametrize('objid', [napi.PlaceID(1736),
+ napi.OsmID('W', 55),
+ napi.OsmID('N', 55, 'amenity')])
+def test_lookup_missing_object(apiobj, objid):
+ apiobj.add_placex(place_id=1, osm_type='N', osm_id=55,
+ class_='place', type='suburb')
+
+ assert apiobj.api.lookup(objid, napi.LookupDetails()) is None
+
+
+@pytest.mark.parametrize('gtype', (napi.GeometryFormat.KML,
+ napi.GeometryFormat.SVG,
+ napi.GeometryFormat.TEXT))
+def test_lookup_unsupported_geometry(apiobj, gtype):
+ apiobj.add_placex(place_id=332)
+
+ with pytest.raises(ValueError):
+ apiobj.api.lookup(napi.PlaceID(332),
+ napi.LookupDetails(geometry_output=gtype))
#
# This file is part of Nominatim. (https://nominatim.org)
#
-# Copyright (C) 2022 by the Nominatim developer community.
+# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Tests for the status API call.
import pytest
from nominatim.version import NOMINATIM_VERSION, NominatimVersion
-from nominatim.api import NominatimAPI
+import nominatim.api as napi
-def test_status_no_extra_info(apiobj, table_factory):
- table_factory('import_status',
- definition="lastimportdate timestamp with time zone NOT NULL")
- table_factory('nominatim_properties',
- definition='property TEXT, value TEXT')
-
- result = apiobj.status()
+def test_status_no_extra_info(apiobj):
+ result = apiobj.api.status()
assert result.status == 0
assert result.message == 'OK'
assert result.data_updated is None
-def test_status_full(apiobj, table_factory):
- table_factory('import_status',
- definition="lastimportdate timestamp with time zone NOT NULL",
- content=(('2022-12-07 15:14:46+01',),))
- table_factory('nominatim_properties',
- definition='property TEXT, value TEXT',
- content=(('database_version', '99.5.4-2'), ))
+def test_status_full(apiobj):
+ import_date = dt.datetime(2022, 12, 7, 14, 14, 46, 0, tzinfo=dt.timezone.utc)
+ apiobj.add_data('import_status',
+ [{'lastimportdate': import_date}])
+ apiobj.add_data('properties',
+ [{'property': 'database_version', 'value': '99.5.4-2'}])
- result = apiobj.status()
+ result = apiobj.api.status()
assert result.status == 0
assert result.message == 'OK'
assert result.software_version == NOMINATIM_VERSION
assert result.database_version == NominatimVersion(99, 5, 4, 2)
- assert result.data_updated == dt.datetime(2022, 12, 7, 14, 14, 46, 0, tzinfo=dt.timezone.utc)
+ assert result.data_updated == import_date
def test_status_database_not_found(monkeypatch):
monkeypatch.setenv('NOMINATIM_DATABASE_DSN', 'dbname=rgjdfkgjedkrgdfkngdfkg')
- api = NominatimAPI(Path('/invalid'), {})
+ api = napi.NominatimAPI(Path('/invalid'), {})
result = api.status()
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Test functions for adapting results to the user's locale.
+"""
+import pytest
+
+from nominatim.api import Locales
+
+def test_display_name_empty_names():
+ l = Locales(['en', 'de'])
+
+ assert l.display_name(None) == ''
+ assert l.display_name({}) == ''
+
+def test_display_name_none_localized():
+ l = Locales()
+
+ assert l.display_name({}) == ''
+ assert l.display_name({'name:de': 'DE', 'name': 'ALL'}) == 'ALL'
+ assert l.display_name({'ref': '34', 'name:de': 'DE'}) == '34'
+
+
+def test_display_name_localized():
+ l = Locales(['en', 'de'])
+
+ assert l.display_name({}) == ''
+ assert l.display_name({'name:de': 'DE', 'name': 'ALL'}) == 'DE'
+ assert l.display_name({'ref': '34', 'name:de': 'DE'}) == 'DE'
+
+
+def test_display_name_preference():
+ l = Locales(['en', 'de'])
+
+ assert l.display_name({}) == ''
+ assert l.display_name({'name:de': 'DE', 'name:en': 'EN'}) == 'EN'
+ assert l.display_name({'official_name:en': 'EN', 'name:de': 'DE'}) == 'DE'
+
+
+@pytest.mark.parametrize('langstr,langlist',
+ [('fr', ['fr']),
+ ('fr-FR', ['fr-FR', 'fr']),
+ ('de,fr-FR', ['de', 'fr-FR', 'fr']),
+ ('fr,de,fr-FR', ['fr', 'de', 'fr-FR']),
+ ('en;q=0.5,fr', ['fr', 'en']),
+ ('en;q=0.5,fr,en-US', ['fr', 'en-US', 'en']),
+ ('en,fr;garbage,de', ['en', 'de'])])
+def test_from_language_preferences(langstr, langlist):
+ assert Locales.from_accept_languages(langstr).languages == langlist
--- /dev/null
+# SPDX-License-Identifier: GPL-2.0-only
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Tests for formatting results for the V1 API.
+"""
+import datetime as dt
+import json
+
+import pytest
+
+import nominatim.api.v1 as api_impl
+import nominatim.api as napi
+from nominatim.version import NOMINATIM_VERSION
+
+STATUS_FORMATS = {'text', 'json'}
+
+# StatusResult
+
+def test_status_format_list():
+ assert set(api_impl.list_formats(napi.StatusResult)) == STATUS_FORMATS
+
+
+@pytest.mark.parametrize('fmt', list(STATUS_FORMATS))
+def test_status_supported(fmt):
+ assert api_impl.supports_format(napi.StatusResult, fmt)
+
+
+def test_status_unsupported():
+ assert not api_impl.supports_format(napi.StatusResult, 'gagaga')
+
+
+def test_status_format_text():
+ assert api_impl.format_result(napi.StatusResult(0, 'message here'), 'text', {}) == 'OK'
+
+
+def test_status_format_text():
+ assert api_impl.format_result(napi.StatusResult(500, 'message here'), 'text', {}) == 'ERROR: message here'
+
+
+def test_status_format_json_minimal():
+ status = napi.StatusResult(700, 'Bad format.')
+
+ result = api_impl.format_result(status, 'json', {})
+
+ assert result == '{"status":700,"message":"Bad format.","software_version":"%s"}' % (NOMINATIM_VERSION, )
+
+
+def test_status_format_json_full():
+ status = napi.StatusResult(0, 'OK')
+ status.data_updated = dt.datetime(2010, 2, 7, 20, 20, 3, 0, tzinfo=dt.timezone.utc)
+ status.database_version = '5.6'
+
+ result = api_impl.format_result(status, 'json', {})
+
+ assert result == '{"status":0,"message":"OK","data_updated":"2010-02-07T20:20:03+00:00","software_version":"%s","database_version":"5.6"}' % (NOMINATIM_VERSION, )
+
+
+# SearchResult
+
+def test_search_details_minimal():
+ search = napi.SearchResult(napi.SourceTable.PLACEX,
+ ('place', 'thing'),
+ napi.Point(1.0, 2.0))
+
+ result = api_impl.format_result(search, 'details-json', {})
+
+ assert json.loads(result) == \
+ {'category': 'place',
+ 'type': 'thing',
+ 'admin_level': 15,
+ 'localname': '',
+ 'calculated_importance': pytest.approx(0.0000001),
+ 'rank_address': 30,
+ 'rank_search': 30,
+ 'isarea': False,
+ 'centroid': {'type': 'Point', 'coordinates': [1.0, 2.0]},
+ 'geometry': {'type': 'Point', 'coordinates': [1.0, 2.0]},
+ }
+
+
+def test_search_details_full():
+ import_date = dt.datetime(2010, 2, 7, 20, 20, 3, 0)
+ search = napi.SearchResult(
+ source_table=napi.SourceTable.PLACEX,
+ category=('amenity', 'bank'),
+ centroid=napi.Point(56.947, -87.44),
+ place_id=37563,
+ parent_place_id=114,
+ linked_place_id=55693,
+ osm_object=('W', 442100),
+ admin_level=14,
+ names={'name': 'Bank', 'name:fr': 'Banque'},
+ address={'city': 'Niento', 'housenumber': ' 3'},
+ extratags={'atm': 'yes'},
+ housenumber='3',
+ postcode='556 X23',
+ wikipedia='en:Bank',
+ rank_address=29,
+ rank_search=28,
+ importance=0.0443,
+ country_code='ll',
+ indexed_date = import_date
+ )
+
+ result = api_impl.format_result(search, 'details-json', {})
+
+ assert json.loads(result) == \
+ {'place_id': 37563,
+ 'parent_place_id': 114,
+ 'osm_type': 'W',
+ 'osm_id': 442100,
+ 'category': 'amenity',
+ 'type': 'bank',
+ 'admin_level': 14,
+ 'localname': 'Bank',
+ 'names': {'name': 'Bank', 'name:fr': 'Banque'},
+ 'addresstags': {'city': 'Niento', 'housenumber': ' 3'},
+ 'housenumber': '3',
+ 'calculated_postcode': '556 X23',
+ 'country_code': 'll',
+ 'indexed_date': '2010-02-07T20:20:03+00:00',
+ 'importance': pytest.approx(0.0443),
+ 'calculated_importance': pytest.approx(0.0443),
+ 'extratags': {'atm': 'yes'},
+ 'calculated_wikipedia': 'en:Bank',
+ 'rank_address': 29,
+ 'rank_search': 28,
+ 'isarea': False,
+ 'centroid': {'type': 'Point', 'coordinates': [56.947, -87.44]},
+ 'geometry': {'type': 'Point', 'coordinates': [56.947, -87.44]},
+ }
+
+
+@pytest.mark.parametrize('gtype,isarea', [('ST_Point', False),
+ ('ST_LineString', False),
+ ('ST_Polygon', True),
+ ('ST_MultiPolygon', True)])
+def test_search_details_no_geometry(gtype, isarea):
+ search = napi.SearchResult(napi.SourceTable.PLACEX,
+ ('place', 'thing'),
+ napi.Point(1.0, 2.0),
+ geometry={'type': gtype})
+
+ result = api_impl.format_result(search, 'details-json', {})
+ js = json.loads(result)
+
+ assert js['geometry'] == {'type': 'Point', 'coordinates': [1.0, 2.0]}
+ assert js['isarea'] == isarea
+
+
+def test_search_details_with_geometry():
+ search = napi.SearchResult(napi.SourceTable.PLACEX,
+ ('place', 'thing'),
+ napi.Point(1.0, 2.0),
+ geometry={'geojson': '{"type":"Point","coordinates":[56.947,-87.44]}'})
+
+ result = api_impl.format_result(search, 'details-json', {})
+ js = json.loads(result)
+
+ assert js['geometry'] == {'type': 'Point', 'coordinates': [56.947, -87.44]}
+ assert js['isarea'] == False
+
+
+def test_search_details_with_address_minimal():
+ search = napi.SearchResult(napi.SourceTable.PLACEX,
+ ('place', 'thing'),
+ napi.Point(1.0, 2.0),
+ address_rows=[
+ napi.AddressLine(place_id=None,
+ osm_object=None,
+ category=('bnd', 'note'),
+ names={},
+ extratags=None,
+ admin_level=None,
+ fromarea=False,
+ isaddress=False,
+ rank_address=10,
+ distance=0.0)
+ ])
+
+ result = api_impl.format_result(search, 'details-json', {})
+ js = json.loads(result)
+
+ assert js['address'] == [{'localname': '',
+ 'class': 'bnd',
+ 'type': 'note',
+ 'rank_address': 10,
+ 'distance': 0.0,
+ 'isaddress': False}]
+
+
+def test_search_details_with_address_full():
+ search = napi.SearchResult(napi.SourceTable.PLACEX,
+ ('place', 'thing'),
+ napi.Point(1.0, 2.0),
+ address_rows=[
+ napi.AddressLine(place_id=3498,
+ osm_object=('R', 442),
+ category=('bnd', 'note'),
+ names={'name': 'Trespass'},
+ extratags={'access': 'no',
+ 'place_type': 'spec'},
+ admin_level=4,
+ fromarea=True,
+ isaddress=True,
+ rank_address=10,
+ distance=0.034)
+ ])
+
+ result = api_impl.format_result(search, 'details-json', {})
+ js = json.loads(result)
+
+ assert js['address'] == [{'localname': 'Trespass',
+ 'place_id': 3498,
+ 'osm_id': 442,
+ 'osm_type': 'R',
+ 'place_type': 'spec',
+ 'class': 'bnd',
+ 'type': 'note',
+ 'admin_level': 4,
+ 'rank_address': 10,
+ 'distance': 0.034,
+ 'isaddress': True}]
#
# This file is part of Nominatim. (https://nominatim.org)
#
-# Copyright (C) 2022 by the Nominatim developer community.
+# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Tests for API access commands of command-line interface wrapper.
import pytest
import nominatim.clicmd.api
-import nominatim.api
-from nominatim.apicmd.status import StatusResult
+import nominatim.api as napi
@pytest.mark.parametrize("endpoint", (('search', 'reverse', 'lookup', 'details', 'status')))
@pytest.mark.parametrize("params", [('search', '--query', 'new'),
('search', '--city', 'Berlin'),
('reverse', '--lat', '0', '--lon', '0', '--zoom', '13'),
- ('lookup', '--id', 'N1'),
- ('details', '--node', '1'),
- ('details', '--way', '1'),
- ('details', '--relation', '1'),
- ('details', '--place_id', '10001')])
+ ('lookup', '--id', 'N1')])
class TestCliApiCallPhp:
@pytest.fixture(autouse=True)
@pytest.fixture(autouse=True)
def setup_status_mock(self, monkeypatch):
- monkeypatch.setattr(nominatim.api.NominatimAPI, 'status',
- lambda self: StatusResult(200, 'OK'))
+ monkeypatch.setattr(napi.NominatimAPI, 'status',
+ lambda self: napi.StatusResult(200, 'OK'))
def test_status_simple(self, cli_call, tmp_path):
json.loads(capsys.readouterr().out)
+class TestCliDetailsCall:
+
+ @pytest.fixture(autouse=True)
+ def setup_status_mock(self, monkeypatch):
+ result = napi.SearchResult(napi.SourceTable.PLACEX, ('place', 'thing'),
+ (1.0, -3.0))
+
+ monkeypatch.setattr(napi.NominatimAPI, 'lookup',
+ lambda *args: result)
+
+ @pytest.mark.parametrize("params", [('--node', '1'),
+ ('--way', '1'),
+ ('--relation', '1'),
+ ('--place_id', '10001')])
+
+ def test_status_json_format(self, cli_call, tmp_path, capsys, params):
+ result = cli_call('details', '--project-dir', str(tmp_path), *params)
+
+ assert result == 0
+
+ json.loads(capsys.readouterr().out)
+
+
QUERY_PARAMS = {
'search': ('--query', 'somewhere'),
'reverse': ('--lat', '20', '--lon', '30'),
assert cli_call('search', *QUERY_PARAMS['search'], '--project-dir', str(project_env.project_dir),
'--no-dedupe') == 0
-
-
-def test_cli_details_param_class(cli_call, project_env):
- webdir = project_env.project_dir / 'website'
- webdir.mkdir()
- (webdir / 'details.php').write_text(f"""<?php
- exit($_GET['class'] == 'highway' ? 0 : 10);
- """)
-
- assert cli_call('details', *QUERY_PARAMS['details'], '--project-dir', str(project_env.project_dir),
- '--class', 'highway') == 0
-
-
-@pytest.mark.parametrize('param', ('lang', 'accept-language'))
-def test_cli_details_param_lang(cli_call, project_env, param):
- webdir = project_env.project_dir / 'website'
- webdir.mkdir()
- (webdir / 'details.php').write_text(f"""<?php
- exit($_GET['accept-language'] == 'es' ? 0 : 10);
- """)
-
- assert cli_call('details', *QUERY_PARAMS['details'], '--project-dir', str(project_env.project_dir),
- '--' + param, 'es') == 0
-
+++ /dev/null
-# SPDX-License-Identifier: GPL-2.0-only
-#
-# This file is part of Nominatim. (https://nominatim.org)
-#
-# Copyright (C) 2022 by the Nominatim developer community.
-# For a full list of authors see the git log.
-"""
-Tests for formatting results for the V1 API.
-"""
-import datetime as dt
-import pytest
-
-import nominatim.result_formatter.v1 as format_module
-from nominatim.apicmd.status import StatusResult
-from nominatim.version import NOMINATIM_VERSION
-
-STATUS_FORMATS = {'text', 'json'}
-
-class TestStatusResultFormat:
-
-
- @pytest.fixture(autouse=True)
- def make_formatter(self):
- self.formatter = format_module.create(StatusResult)
-
-
- def test_format_list(self):
- assert set(self.formatter.list_formats()) == STATUS_FORMATS
-
-
- @pytest.mark.parametrize('fmt', list(STATUS_FORMATS))
- def test_supported(self, fmt):
- assert self.formatter.supports_format(fmt)
-
-
- def test_unsupported(self):
- assert not self.formatter.supports_format('gagaga')
-
-
- def test_format_text(self):
- assert self.formatter.format(StatusResult(0, 'message here'), 'text') == 'OK'
-
-
- def test_format_text(self):
- assert self.formatter.format(StatusResult(500, 'message here'), 'text') == 'ERROR: message here'
-
-
- def test_format_json_minimal(self):
- status = StatusResult(700, 'Bad format.')
-
- result = self.formatter.format(status, 'json')
-
- assert result == '{"status": 700, "message": "Bad format.", "software_version": "%s"}' % (NOMINATIM_VERSION, )
-
-
- def test_format_json_full(self):
- status = StatusResult(0, 'OK')
- status.data_updated = dt.datetime(2010, 2, 7, 20, 20, 3, 0, tzinfo=dt.timezone.utc)
- status.database_version = '5.6'
-
- result = self.formatter.format(status, 'json')
-
- assert result == '{"status": 0, "message": "OK", "data_updated": "2010-02-07T20:20:03+00:00", "software_version": "%s", "database_version": "5.6"}' % (NOMINATIM_VERSION, )
def test_recompute_importance(placex_table, table_factory, temp_db_conn, temp_db_cursor):
temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION compute_importance(extratags HSTORE,
country_code varchar(2),
- osm_type varchar(1), osm_id BIGINT,
+ rank_search SMALLINT,
centroid GEOMETRY,
OUT importance FLOAT,
OUT wikipedia TEXT)
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Tests for the streaming JSON writer.
+"""
+import json
+
+import pytest
+
+from nominatim.utils.json_writer import JsonWriter
+
+@pytest.mark.parametrize("inval,outstr", [(None, 'null'),
+ (True, 'true'), (False, 'false'),
+ (23, '23'), (0, '0'), (-1.3, '-1.3'),
+ ('g\nä', '"g\\nä"'), ('"', '"\\\""'),
+ ({}, '{}'), ([], '[]')])
+def test_simple_value(inval, outstr):
+ writer = JsonWriter()
+ writer.value(inval)
+
+ assert writer() == outstr
+ json.loads(writer())
+
+
+def test_empty_array():
+ writer = JsonWriter().start_array().end_array()
+
+ assert writer() == '[]'
+ json.loads(writer())
+
+
+def test_array_with_single_value():
+ writer = JsonWriter().start_array().value(None).end_array()
+
+ assert writer() == '[null]'
+ json.loads(writer())
+
+
+@pytest.mark.parametrize("invals,outstr", [((1, ), '[1]'),
+ (('a', 'b'), '["a","b"]')])
+def test_array_with_data(invals, outstr):
+ writer = JsonWriter()
+
+ writer.start_array()
+ for ival in invals:
+ writer.value(ival).next()
+ writer.end_array()
+
+ assert writer() == outstr
+ json.loads(writer())
+
+
+def test_empty_object():
+ writer = JsonWriter().start_object().end_object()
+
+ assert writer() == '{}'
+ json.loads(writer())
+
+
+def test_object_single_entry():
+ writer = JsonWriter()\
+ .start_object()\
+ .key('something')\
+ .value(5)\
+ .end_object()
+
+ assert writer() == '{"something":5}'
+ json.loads(writer())
+
+def test_object_many_values():
+ writer = JsonWriter()\
+ .start_object()\
+ .keyval('foo', None)\
+ .keyval('bar', {})\
+ .keyval('baz', 'b\taz')\
+ .end_object()
+
+ assert writer() == '{"foo":null,"bar":{},"baz":"b\\taz"}'
+ json.loads(writer())
+
+def test_object_many_values_without_none():
+ writer = JsonWriter()\
+ .start_object()\
+ .keyval_not_none('foo', 0)\
+ .keyval_not_none('bar', None)\
+ .keyval_not_none('baz', '')\
+ .keyval_not_none('eve', False,
+ transform = lambda v: 'yes' if v else 'no')\
+ .end_object()
+
+ assert writer() == '{"foo":0,"baz":"","eve":"no"}'
+ json.loads(writer())
+
+
+def test_raw_output():
+ writer = JsonWriter()\
+ .start_array()\
+ .raw('{ "nicely": "formatted here" }').next()\
+ .value(1)\
+ .end_array()
+
+ assert writer() == '[{ "nicely": "formatted here" },1]'
# Some of the Python packages that come with Ubuntu 20.04 are too old, so
# install the latest version from pip:
- pip3 install --user sqlalchemy asyncpg
+ pip3 install --user sqlalchemy GeoAlchemy2 asyncpg
#
php-cli php-pgsql php-intl libicu-dev python3-dotenv \
python3-psycopg2 python3-psutil python3-jinja2 \
python3-icu python3-datrie python3-sqlalchemy \
- python3-asyncpg git
+ python3-geoalchemy2 python3-asyncpg git
#
# System Configuration