+++ /dev/null
-[MASTER]
-
-extension-pkg-whitelist=osmium,falcon
-ignored-modules=icu,datrie
-
-[MESSAGES CONTROL]
-
-[TYPECHECK]
-
-# closing added here because it sometimes triggers a false positive with
-# 'with' statements.
-ignored-classes=NominatimArgs,closing
-# 'too-many-ancestors' is triggered already by deriving from UserDict
-# 'not-context-manager' disabled because it causes false positives once
-# typed Python is enabled. See also https://github.com/PyCQA/pylint/issues/5273
-disable=too-few-public-methods,duplicate-code,too-many-ancestors,bad-option-value,no-self-use,not-context-manager,use-dict-literal,chained-comparison,attribute-defined-outside-init,too-many-boolean-expressions,contextmanager-generator-missing-cleanup,too-many-positional-arguments
-
-good-names=i,j,x,y,m,t,fd,db,cc,x1,x2,y1,y2,pt,k,v,nr
-
-[DESIGN]
-
-max-returns=7
# This file is just a placeholder to make the config module available
# during development. It will be replaced by nominatim_db/config.py on
# installation.
-# pylint: skip-file
+# flake8: noqa
from nominatim_db.config import *
T = TypeVar('T')
+
class SearchConnection:
""" An extended SQLAlchemy connection class, that also contains
the table definitions. The underlying asynchronous SQLAlchemy
tables: SearchTables,
properties: Dict[str, Any]) -> None:
self.connection = conn
- self.t = tables # pylint: disable=invalid-name
+ self.t = tables
self._property_cache = properties
self._classtables: Optional[Set[str]] = None
self.query_timeout: Optional[int] = None
-
def set_query_timeout(self, timeout: Optional[int]) -> None:
""" Set the timeout after which a query over this connection
is cancelled.
"""
self.query_timeout = timeout
-
async def scalar(self, sql: sa.sql.base.Executable,
- params: Union[Mapping[str, Any], None] = None
- ) -> Any:
+ params: Union[Mapping[str, Any], None] = None) -> Any:
""" Execute a 'scalar()' query on the connection.
"""
log().sql(self.connection, sql, params)
return await asyncio.wait_for(self.connection.scalar(sql, params), self.query_timeout)
-
async def execute(self, sql: 'sa.Executable',
params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None] = None
- ) -> 'sa.Result[Any]':
+ ) -> 'sa.Result[Any]':
""" Execute a 'execute()' query on the connection.
"""
log().sql(self.connection, sql, params)
return await asyncio.wait_for(self.connection.execute(sql, params), self.query_timeout)
-
async def get_property(self, name: str, cached: bool = True) -> str:
""" Get a property from Nominatim's property table.
return cast(str, value)
-
async def get_db_property(self, name: str) -> Any:
""" Get a setting from the database. At the moment, only
'server_version', the version of the database software, can
return self._property_cache['DB:server_version']
-
async def get_cached_value(self, group: str, name: str,
factory: Callable[[], Awaitable[T]]) -> T:
""" Access the cache for this Nominatim instance.
return value
-
async def get_class_table(self, cls: str, typ: str) -> Optional[SaFromClause]:
""" Lookup up if there is a classtype table for the given category
and return a SQLAlchemy table for it, if it exists.
"""
Implementation of classes for API access via libraries.
"""
-from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List,\
+from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, \
Union, Tuple, cast
import asyncio
import sys
from .sql.sqlalchemy_schema import SearchTables
from .sql.async_core_library import PGCORE_LIB, PGCORE_ERROR
from .config import Configuration
-from .sql import sqlite_functions, sqlalchemy_functions #pylint: disable=unused-import
+from .sql import sqlite_functions, sqlalchemy_functions # noqa
from .connection import SearchConnection
from .status import get_status, StatusResult
from .lookup import get_detailed_place, get_simple_place
from .results import DetailedResult, ReverseResult, SearchResults
-class NominatimAPIAsync: #pylint: disable=too-many-instance-attributes
+class NominatimAPIAsync:
""" The main frontend to the Nominatim database implements the
functions for lookup, forward and reverse geocoding using
asynchronous functions.
"""
self.config = Configuration(project_dir, environ)
self.query_timeout = self.config.get_int('QUERY_TIMEOUT') \
- if self.config.QUERY_TIMEOUT else None
+ if self.config.QUERY_TIMEOUT else None
self.reverse_restrict_to_country_area = self.config.get_bool('SEARCH_WITHIN_COUNTRIES')
self.server_version = 0
if sys.version_info >= (3, 10):
self._engine_lock = asyncio.Lock()
else:
- self._engine_lock = asyncio.Lock(loop=loop) # pylint: disable=unexpected-keyword-arg
+ self._engine_lock = asyncio.Lock(loop=loop)
self._engine: Optional[sa_asyncio.AsyncEngine] = None
self._tables: Optional[SearchTables] = None
self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
-
async def setup_database(self) -> None:
""" Set up the SQL engine and connections.
extra_args['max_overflow'] = 0
extra_args['pool_size'] = self.config.get_int('API_POOL_SIZE')
-
is_sqlite = self.config.DATABASE_DSN.startswith('sqlite:')
if is_sqlite:
self._property_cache['DB:server_version'] = server_version
- self._tables = SearchTables(sa.MetaData()) # pylint: disable=no-member
+ self._tables = SearchTables(sa.MetaData())
self._engine = engine
-
async def close(self) -> None:
""" Close all active connections to the database. The NominatimAPIAsync
object remains usable after closing. If a new API functions is
if self._engine is not None:
await self._engine.dispose()
-
async def __aenter__(self) -> 'NominatimAPIAsync':
return self
-
async def __aexit__(self, *_: Any) -> None:
await self.close()
-
@contextlib.asynccontextmanager
async def begin(self) -> AsyncIterator[SearchConnection]:
""" Create a new connection with automatic transaction handling.
async with self._engine.begin() as conn:
yield SearchConnection(conn, self._tables, self._property_cache)
-
async def status(self) -> StatusResult:
""" Return the status of the database.
"""
return status
-
async def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
""" Get detailed information about a place in the database.
await make_query_analyzer(conn)
return await get_detailed_place(conn, place, details)
-
async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
""" Get simple information about a list of places.
return SearchResults(filter(None,
[await get_simple_place(conn, p, details) for p in places]))
-
async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
""" Find a place by its coordinates. Also known as reverse geocoding.
self.reverse_restrict_to_country_area)
return await geocoder.lookup(coord)
-
async def search(self, query: str, **params: Any) -> SearchResults:
""" Find a place by free-text search. Also known as forward geocoding.
"""
async with self.begin() as conn:
conn.set_query_timeout(self.query_timeout)
geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params),
- self.config.get_int('REQUEST_TIMEOUT') \
- if self.config.REQUEST_TIMEOUT else None)
+ self.config.get_int('REQUEST_TIMEOUT')
+ if self.config.REQUEST_TIMEOUT else None)
phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
return await geocoder.lookup(phrases)
-
- # pylint: disable=too-many-arguments,too-many-branches
async def search_address(self, amenity: Optional[str] = None,
street: Optional[str] = None,
city: Optional[str] = None,
details.layers |= ntyp.DataLayer.POI
geocoder = ForwardGeocoder(conn, details,
- self.config.get_int('REQUEST_TIMEOUT') \
- if self.config.REQUEST_TIMEOUT else None)
+ self.config.get_int('REQUEST_TIMEOUT')
+ if self.config.REQUEST_TIMEOUT else None)
return await geocoder.lookup(phrases)
-
async def search_category(self, categories: List[Tuple[str, str]],
near_query: Optional[str] = None,
**params: Any) -> SearchResults:
await make_query_analyzer(conn)
geocoder = ForwardGeocoder(conn, details,
- self.config.get_int('REQUEST_TIMEOUT') \
- if self.config.REQUEST_TIMEOUT else None)
+ self.config.get_int('REQUEST_TIMEOUT')
+ if self.config.REQUEST_TIMEOUT else None)
return await geocoder.lookup_pois(categories, phrases)
-
class NominatimAPI:
""" This class provides a thin synchronous wrapper around the asynchronous
Nominatim functions. It creates its own event loop and runs each
self._loop = asyncio.new_event_loop()
self._async_api = NominatimAPIAsync(project_dir, environ, loop=self._loop)
-
def close(self) -> None:
""" Close all active connections to the database.
self._loop.run_until_complete(self._async_api.close())
self._loop.close()
-
def __enter__(self) -> 'NominatimAPI':
return self
-
def __exit__(self, *_: Any) -> None:
self.close()
-
@property
def config(self) -> Configuration:
""" Provide read-only access to the [configuration](Configuration.md)
"""
return self._loop.run_until_complete(self._async_api.status())
-
def details(self, place: ntyp.PlaceRef, **params: Any) -> Optional[DetailedResult]:
""" Get detailed information about a place in the database.
"""
return self._loop.run_until_complete(self._async_api.details(place, **params))
-
def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
""" Get simple information about a list of places.
"""
return self._loop.run_until_complete(self._async_api.lookup(places, **params))
-
def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]:
""" Find a place by its coordinates. Also known as reverse geocoding.
"""
return self._loop.run_until_complete(self._async_api.reverse(coord, **params))
-
def search(self, query: str, **params: Any) -> SearchResults:
""" Find a place by free-text search. Also known as forward geocoding.
return self._loop.run_until_complete(
self._async_api.search(query, **params))
-
- # pylint: disable=too-many-arguments
def search_address(self, amenity: Optional[str] = None,
street: Optional[str] = None,
city: Optional[str] = None,
self._async_api.search_address(amenity, street, city, county,
state, country, postalcode, **params))
-
def search_category(self, categories: List[Tuple[str, str]],
near_query: Optional[str] = None,
**params: Any) -> SearchResults:
Custom exception and error classes for Nominatim.
"""
+
class UsageError(Exception):
""" An error raised because of bad user input. This error will usually
not cause a stack trace to be printed unless debugging is enabled.
import re
+
class Locales:
""" Helper class for localization of names.
self._add_lang_tags('official_name', 'short_name')
self._add_tags('official_name', 'short_name', 'ref')
-
def __bool__(self) -> bool:
return len(self.languages) > 0
-
def _add_tags(self, *tags: str) -> None:
for tag in tags:
self.name_tags.append(tag)
self.name_tags.append(f"_place_{tag}")
-
def _add_lang_tags(self, *tags: str) -> None:
for tag in tags:
for lang in self.languages:
self.name_tags.append(f"{tag}:{lang}")
self.name_tags.append(f"_place_{tag}:{lang}")
-
def display_name(self, names: Optional[Mapping[str, str]]) -> str:
""" Return the best matching name from a dictionary of names
containing different name variants.
# Nothing? Return any of the other names as a default.
return next(iter(names.values()))
-
@staticmethod
def from_accept_languages(langstr: str) -> 'Locales':
""" Create a localization object from a language list in the
""" Start a new debug chapter for the given function and its parameters.
"""
-
def section(self, heading: str) -> None:
""" Start a new section with the given title.
"""
-
def comment(self, text: str) -> None:
""" Add a simple comment to the debug output.
"""
-
def var_dump(self, heading: str, var: Any) -> None:
""" Print the content of the variable to the debug output prefixed by
the given heading.
"""
-
def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
""" Print the table generated by the generator function.
"""
-
def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
""" Print a list of search results generated by the generator function.
"""
-
def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
""" Print the SQL for the given statement.
"""
def format_sql(self, conn: AsyncConnection, statement: 'sa.Executable',
- extra_params: Union[Mapping[str, Any],
- Sequence[Mapping[str, Any]], None]) -> str:
+ extra_params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]
+ ) -> str:
""" Return the compiled version of the statement.
"""
compiled = cast('sa.ClauseElement', statement).compile(conn.sync_engine)
try:
sqlstr = re.sub(r'__\[POSTCOMPILE_[^]]*\]', '%s', sqlstr)
return sqlstr % tuple((repr(params.get(name, None))
- for name in compiled.positiontup)) # type: ignore
+ for name in compiled.positiontup)) # type: ignore
except TypeError:
return sqlstr
assert conn.dialect.name == 'sqlite'
# params in positional order
- pparams = (repr(params.get(name, None)) for name in compiled.positiontup) # type: ignore
+ pparams = (repr(params.get(name, None)) for name in compiled.positiontup) # type: ignore
sqlstr = re.sub(r'__\[POSTCOMPILE_([^]]*)\]', '?', sqlstr)
sqlstr = re.sub(r"\?", lambda m: next(pparams), sqlstr)
return sqlstr
+
class HTMLLogger(BaseLogger):
""" Logger that formats messages in HTML.
"""
def __init__(self) -> None:
self.buffer = io.StringIO()
-
def _timestamp(self) -> None:
self._write(f'<p class="timestamp">[{dt.datetime.now()}]</p>')
-
def get_buffer(self) -> str:
return HTML_HEADER + self.buffer.getvalue() + HTML_FOOTER
-
def function(self, func: str, **kwargs: Any) -> None:
self._timestamp()
self._write(f"<h1>Debug output for {func}()</h1>\n<p>Parameters:<dl>")
self._write(f'<dt>{name}</dt><dd>{self._python_var(value)}</dd>')
self._write('</dl></p>')
-
def section(self, heading: str) -> None:
self._timestamp()
self._write(f"<h2>{heading}</h2>")
-
def comment(self, text: str) -> None:
self._timestamp()
self._write(f"<p>{text}</p>")
-
def var_dump(self, heading: str, var: Any) -> None:
self._timestamp()
if callable(var):
self._write(f'<h5>{heading}</h5>{self._python_var(var)}')
-
def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
self._timestamp()
head = next(rows)
self._write('</tr>')
self._write('</tbody></table>')
-
def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
""" Print a list of search results generated by the generator function.
"""
self._timestamp()
+
def format_osm(osm_object: Optional[Tuple[str, int]]) -> str:
if not osm_object:
return '-'
total += 1
self._write(f'</dl><b>TOTAL:</b> {total}</p>')
-
def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
self._timestamp()
else:
self._write(f'<code class="lang-sql">{html.escape(sqlstr)}</code>')
-
def _python_var(self, var: Any) -> str:
if CODE_HIGHLIGHT:
fmt = highlight(str(var), PythonLexer(), HtmlFormatter(nowrap=True))
return f'<code class="lang-python">{html.escape(str(var))}</code>'
-
def _write(self, text: str) -> None:
""" Add the raw text to the debug output.
"""
def __init__(self) -> None:
self.buffer = io.StringIO()
-
def _timestamp(self) -> None:
self._write(f'[{dt.datetime.now()}]\n')
-
def get_buffer(self) -> str:
return self.buffer.getvalue()
-
def function(self, func: str, **kwargs: Any) -> None:
self._write(f"#### Debug output for {func}()\n\nParameters:\n")
for name, value in kwargs.items():
self._write(f' {name}: {self._python_var(value)}\n')
self._write('\n')
-
def section(self, heading: str) -> None:
self._timestamp()
self._write(f"\n# {heading}\n\n")
-
def comment(self, text: str) -> None:
self._write(f"{text}\n")
-
def var_dump(self, heading: str, var: Any) -> None:
if callable(var):
var = var()
self._write(f'{heading}:\n {self._python_var(var)}\n\n')
-
def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
self._write(f'{heading}:\n')
data = [list(map(self._python_var, row)) if row else None for row in rows]
maxlens = [max(len(d[i]) for d in data if d) for i in range(num_cols)]
tablewidth = sum(maxlens) + 3 * num_cols + 1
- row_format = '| ' +' | '.join(f'{{:<{l}}}' for l in maxlens) + ' |\n'
+ row_format = '| ' + ' | '.join(f'{{:<{ln}}}' for ln in maxlens) + ' |\n'
self._write('-'*tablewidth + '\n')
self._write(row_format.format(*data[0]))
self._write('-'*tablewidth + '\n')
if data[-1]:
self._write('-'*tablewidth + '\n')
-
def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
self._timestamp()
self._write(f'{heading}:\n')
total += 1
self._write(f'TOTAL: {total}\n\n')
-
def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
self._timestamp()
sqlstr = '\n| '.join(textwrap.wrap(self.format_sql(conn, statement, params), width=78))
self._write(f"| {sqlstr}\n\n")
-
def _python_var(self, var: Any) -> str:
return str(var)
-
def _write(self, text: str) -> None:
self.buffer.write(text)
<title>Nominatim - Debug</title>
<style>
""" + \
-(HtmlFormatter(nobackground=True).get_style_defs('.highlight') if CODE_HIGHLIGHT else '') +\
-"""
+ (HtmlFormatter(nobackground=True).get_style_defs('.highlight') if CODE_HIGHLIGHT else '') + \
+ """
h2 { font-size: x-large }
dl {
async def find_in_all_tables(conn: SearchConnection, place: ntyp.PlaceRef,
add_geometries: GeomFunc
- ) -> Tuple[Optional[SaRow], RowFunc[nres.BaseResultT]]:
+ ) -> Tuple[Optional[SaRow], RowFunc[nres.BaseResultT]]:
""" Search for the given place in all data tables
and return the base information.
"""
return sql.add_columns(*out)
-
row_func: RowFunc[nres.SearchResult]
row, row_func = await find_in_all_tables(conn, place, _add_geometry)
from .server.content_types import CONTENT_JSON
-T = TypeVar('T') # pylint: disable=invalid-name
+T = TypeVar('T') # pylint: disable=invalid-name
FormatFunc = Callable[[T, Mapping[str, Any]], str]
ErrorFormatFunc = Callable[[str, str, int], str]
self.content_types.update(content_types)
self.format_functions: Dict[Type[Any], Dict[str, FormatFunc[Any]]] = defaultdict(dict)
-
def format_func(self, result_class: Type[T],
fmt: str) -> Callable[[FormatFunc[T]], FormatFunc[T]]:
""" Decorator for a function that formats a given type of result into the
return decorator
-
def error_format_func(self, func: ErrorFormatFunc) -> ErrorFormatFunc:
""" Decorator for a function that formats error messges.
There is only one error formatter per dispatcher. Using
self.error_handler = func
return func
-
def list_formats(self, result_type: Type[Any]) -> List[str]:
""" Return a list of formats supported by this formatter.
"""
return list(self.format_functions[result_type].keys())
-
def supports_format(self, result_type: Type[Any], fmt: str) -> bool:
""" Check if the given format is supported by this formatter.
"""
return fmt in self.format_functions[result_type]
-
def format_result(self, result: Any, fmt: str, options: Mapping[str, Any]) -> str:
""" Convert the given result into a string using the given format.
"""
return self.format_functions[type(result)][fmt](result, options)
-
def format_error(self, content_type: str, msg: str, status: int) -> str:
""" Convert the given error message into a response string
taking the requested content_type into account.
"""
return self.error_handler(content_type, msg, status)
-
def set_content_type(self, fmt: str, content_type: str) -> None:
""" Set the content type for the given format. This is the string
that will be returned in the Content-Type header of the HTML
"""
self.content_types[fmt] = content_type
-
def get_content_type(self, fmt: str) -> str:
""" Return the content type for the given format.
from .localization import Locales
# This file defines complex result data classes.
-# pylint: disable=too-many-instance-attributes
+
def _mingle_name_tags(names: Optional[Dict[str, str]]) -> Optional[Dict[str, str]]:
""" Mix-in names from linked places, so that they show up
return label_parts
-
@dataclasses.dataclass
class WordInfo:
""" Each entry in the list of search terms contains the
category: Tuple[str, str]
centroid: Point
- place_id : Optional[int] = None
+ place_id: Optional[int] = None
osm_object: Optional[Tuple[str, int]] = None
parent_place_id: Optional[int] = None
linked_place_id: Optional[int] = None
"""
return self.centroid[1]
-
@property
def lon(self) -> float:
""" Get the longitude (or x) of the center point of the place.
"""
return self.centroid[0]
-
def calculated_importance(self) -> float:
""" Get a valid importance value. This is either the stored importance
of the value or an artificial value computed from the place's
"""
return self.importance or (0.40001 - (self.rank_search/75.0))
-
def localize(self, locales: Locales) -> None:
""" Fill the locale_name and the display_name field for the
place and, if available, its address information.
self.display_name = self.locale_name
-
BaseResultT = TypeVar('BaseResultT', bound=BaseResult)
+
@dataclasses.dataclass
class DetailedResult(BaseResult):
""" A search result with more internal information from the database
bbox: Optional[Bbox] = None
accuracy: float = 0.0
-
@property
def ranking(self) -> float:
""" Return the ranking, a combined measure of accuracy and importance.
"""
return (self.accuracy if self.accuracy is not None else 1) \
- - self.calculated_importance()
+ - self.calculated_importance()
class SearchResults(List[SearchResult]):
def _filter_geometries(row: SaRow) -> Dict[str, str]:
- return {k[9:]: v for k, v in row._mapping.items() # pylint: disable=W0212
+ return {k[9:]: v for k, v in row._mapping.items()
if k.startswith('geometry_')}
place_id=row.place_id,
osm_object=(row.osm_type, row.osm_id),
category=(row.class_, row.type),
- parent_place_id = row.parent_place_id,
- linked_place_id = getattr(row, 'linked_place_id', None),
- admin_level = getattr(row, 'admin_level', 15),
+ parent_place_id=row.parent_place_id,
+ linked_place_id=getattr(row, 'linked_place_id', None),
+ admin_level=getattr(row, 'admin_level', 15),
names=_mingle_name_tags(row.name),
address=row.address,
extratags=row.extratags,
res = class_type(source_table=SourceTable.OSMLINE,
place_id=row.place_id,
- parent_place_id = row.parent_place_id,
+ parent_place_id=row.parent_place_id,
osm_object=('W', row.osm_id),
category=('place', 'houses' if hnr is None else 'house'),
address=row.address,
res = class_type(source_table=SourceTable.TIGER,
place_id=row.place_id,
- parent_place_id = row.parent_place_id,
+ parent_place_id=row.parent_place_id,
osm_object=(osm_type or row.osm_type, osm_id or row.osm_id),
category=('place', 'houses' if hnr is None else 'house'),
postcode=row.postcode,
def create_from_postcode_row(row: Optional[SaRow],
- class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
+ class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
""" Construct a new result and add the data from the result row
from the postcode table. 'class_type' defines
the type of result to return. Returns None if the row is None.
return class_type(source_table=SourceTable.POSTCODE,
place_id=row.place_id,
- parent_place_id = row.parent_place_id,
+ parent_place_id=row.parent_place_id,
category=('place', 'postcode'),
names={'ref': row.postcode},
rank_search=row.rank_search,
def create_from_country_row(row: Optional[SaRow],
- class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
+ class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
""" Construct a new result and add the data from the result row
from the fallback country tables. 'class_type' defines
the type of result to return. Returns None if the row is None.
distance=0.0))
result.address_rows.append(AddressLine(
category=('place', 'country_code'),
- names={'ref': result.country_code}, extratags = {},
+ names={'ref': result.country_code}, extratags={},
fromarea=True, isaddress=False, rank_address=4,
distance=0.0))
for result in results:
_setup_address_details(result)
- ### Lookup entries from place_address line
+ # Lookup entries from place_address line
lookup_ids = [{'pid': r.place_id,
'lid': _get_address_lookup_id(r),
'names': list(r.address.values()) if r.address else [],
- 'c': ('SRID=4326;' + r.centroid.to_wkt()) if r.centroid else '' }
+ 'c': ('SRID=4326;' + r.centroid.to_wkt()) if r.centroid else ''}
for r in results if r.place_id]
if not lookup_ids:
.order_by(taddr.c.distance.desc())\
.order_by(t.c.rank_search.desc())
-
current_result = None
current_rank_address = -1
for row in await conn.execute(sql):
for result in results:
await _finalize_entry(conn, result)
-
- ### Finally add the record for the parent entry where necessary.
+ # Finally add the record for the parent entry where necessary.
parent_lookup_ids = list(filter(lambda e: e['pid'] != e['lid'], lookup_ids))
if parent_lookup_ids:
t.c.class_, t.c.type, t.c.extratags,
t.c.admin_level,
t.c.rank_address)\
- .where(t.c.place_id == ltab.c.value['lid'].as_integer())
+ .where(t.c.place_id == ltab.c.value['lid'].as_integer())
for row in await conn.execute(sql):
current_result = next((r for r in results if r.place_id == row.src_place_id), None)
fromarea=True, isaddress=True,
rank_address=row.rank_address, distance=0.0))
- ### Now sort everything
+ # Now sort everything
def mk_sort_key(place_id: Optional[int]) -> Callable[[AddressLine], Tuple[bool, int, bool]]:
return lambda a: (a.place_id != place_id, -a.rank_address, a.isaddress)
return
sql = _placex_select_address_row(conn, result.centroid)\
- .where(conn.t.placex.c.linked_place_id == result.place_id)
+ .where(conn.t.placex.c.linked_place_id == result.place_id)
for row in await conn.execute(sql):
result.linked_rows.append(_result_row_to_address_row(row))
return
sql = _placex_select_address_row(conn, result.centroid)\
- .where(conn.t.placex.c.parent_place_id == result.place_id)\
- .where(conn.t.placex.c.rank_search == 30)
+ .where(conn.t.placex.c.parent_place_id == result.place_id)\
+ .where(conn.t.placex.c.rank_search == 30)
for row in await conn.execute(sql):
result.parented_rows.append(_result_row_to_address_row(row))
import sqlalchemy as sa
-from .typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow,\
+from .typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow, \
SaBind, SaLambdaSelect
from .sql.sqlalchemy_types import Geometry
from .connection import SearchConnection
WKT_PARAM: SaBind = sa.bindparam('wkt', type_=Geometry)
MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
+
def no_index(expr: SaColumn) -> SaColumn:
""" Wrap the given expression, so that the query planner will
refrain from using the expression for index lookup.
"""
- return sa.func.coalesce(sa.null(), expr) # pylint: disable=not-callable
+ return sa.func.coalesce(sa.null(), expr)
def _select_from_placex(t: SaFromClause, use_wkt: bool = True) -> SaSelect:
centroid = sa.case((t.c.geometry.is_line_like(), t.c.geometry.ST_ClosestPoint(WKT_PARAM)),
else_=t.c.centroid).label('centroid')
-
return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
t.c.class_, t.c.type,
t.c.address, t.c.extratags,
def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
return sa.cast(table.c.startnumber
- + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
- / table.c.step) * table.c.step,
+ + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
+ / table.c.step) * table.c.step,
sa.Integer).label('housenumber')
fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
rounded_pos = sa.func.round(table.c.position / fac) * fac
return sa.case(
- (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
- else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
+ (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
+ else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
def _locate_interpolation(table: SaFromClause) -> SaLabel:
self.bind_params: Dict[str, Any] = {'max_rank': params.max_rank}
-
@property
def max_rank(self) -> int:
""" Return the maximum configured rank.
"""
return self.params.max_rank
-
def has_geometries(self) -> bool:
""" Check if any geometries are requested.
"""
return bool(self.params.geometry_output)
-
def layer_enabled(self, *layer: DataLayer) -> bool:
""" Return true when any of the given layer types are requested.
"""
- return any(self.params.layers & l for l in layer)
-
+ return any(self.params.layers & ly for ly in layer)
def layer_disabled(self, *layer: DataLayer) -> bool:
""" Return true when none of the given layer types is requested.
"""
- return not any(self.params.layers & l for l in layer)
-
+ return not any(self.params.layers & ly for ly in layer)
def has_feature_layers(self) -> bool:
""" Return true if any layer other than ADDRESS or POI is requested.
"""
return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
-
def _add_geometry_columns(self, sql: SaLambdaSelect, col: SaColumn) -> SaSelect:
out = []
return sql.add_columns(*out)
-
def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
if self.layer_enabled(DataLayer.MANMADE):
exclude = []
include.extend(('natural', 'water', 'waterway'))
return table.c.class_.in_(tuple(include))
-
async def _find_closest_street_or_poi(self, distance: float) -> Optional[SaRow]:
""" Look up the closest rank 26+ place in the database, which
is closer than the given distance.
# when used with prepared statements
diststr = sa.text(f"{distance}")
- sql: SaLambdaSelect = sa.lambda_stmt(lambda: _select_from_placex(t)
- .where(t.c.geometry.within_distance(WKT_PARAM, diststr))
- .where(t.c.indexed_status == 0)
- .where(t.c.linked_place_id == None)
- .where(sa.or_(sa.not_(t.c.geometry.is_area()),
- t.c.centroid.ST_Distance(WKT_PARAM) < diststr))
- .order_by('distance')
- .limit(2))
+ sql: SaLambdaSelect = sa.lambda_stmt(
+ lambda: _select_from_placex(t)
+ .where(t.c.geometry.within_distance(WKT_PARAM, diststr))
+ .where(t.c.indexed_status == 0)
+ .where(t.c.linked_place_id == None)
+ .where(sa.or_(sa.not_(t.c.geometry.is_area()),
+ t.c.centroid.ST_Distance(WKT_PARAM) < diststr))
+ .order_by('distance')
+ .limit(2))
if self.has_geometries():
sql = self._add_geometry_columns(sql, t.c.geometry)
return prev_row
-
async def _find_housenumber_for_street(self, parent_place_id: int) -> Optional[SaRow]:
t = self.conn.t.placex
return (await self.conn.execute(sql, self.bind_params)).one_or_none()
-
async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
distance: float) -> Optional[SaRow]:
t = self.conn.t.osmline
inner = sql.subquery('ipol')
sql = sa.select(inner.c.place_id, inner.c.osm_id,
- inner.c.parent_place_id, inner.c.address,
- _interpolated_housenumber(inner),
- _interpolated_position(inner),
- inner.c.postcode, inner.c.country_code,
- inner.c.distance)
+ inner.c.parent_place_id, inner.c.address,
+ _interpolated_housenumber(inner),
+ _interpolated_position(inner),
+ inner.c.postcode, inner.c.country_code,
+ inner.c.distance)
if self.has_geometries():
sub = sql.subquery('geom')
return (await self.conn.execute(sql, self.bind_params)).one_or_none()
-
async def _find_tiger_number_for_street(self, parent_place_id: int) -> Optional[SaRow]:
t = self.conn.t.tiger
return (await self.conn.execute(sql, self.bind_params)).one_or_none()
-
async def lookup_street_poi(self) -> Tuple[Optional[SaRow], RowFunc]:
""" Find a street or POI/address for the given WKT point.
"""
return row, row_func
-
async def _lookup_area_address(self) -> Optional[SaRow]:
""" Lookup large addressable areas for the given WKT point.
"""
.subquery('area')
return _select_from_placex(inner, False)\
- .where(inner.c.geometry.ST_Contains(WKT_PARAM))\
- .order_by(sa.desc(inner.c.rank_search))\
- .limit(1)
+ .where(inner.c.geometry.ST_Contains(WKT_PARAM))\
+ .order_by(sa.desc(inner.c.rank_search))\
+ .limit(1)
sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
if self.has_geometries():
def _place_inside_area_query() -> SaSelect:
inner = \
- sa.select(t,
- t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
- .where(t.c.rank_search > address_rank)\
- .where(t.c.rank_search <= MAX_RANK_PARAM)\
- .where(t.c.indexed_status == 0)\
- .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
- .order_by(sa.desc(t.c.rank_search))\
- .limit(50)\
- .subquery('places')
+ sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
+ .where(t.c.rank_search > address_rank)\
+ .where(t.c.rank_search <= MAX_RANK_PARAM)\
+ .where(t.c.indexed_status == 0)\
+ .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
+ .order_by(sa.desc(t.c.rank_search))\
+ .limit(50)\
+ .subquery('places')
touter = t.alias('outer')
return _select_from_placex(inner, False)\
return address_row
-
async def _lookup_area_others(self) -> Optional[SaRow]:
t = self.conn.t.placex
.subquery()
sql = _select_from_placex(inner, False)\
- .where(sa.or_(sa.not_(inner.c.geometry.is_area()),
- inner.c.geometry.ST_Contains(WKT_PARAM)))\
- .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
- .limit(1)
+ .where(sa.or_(sa.not_(inner.c.geometry.is_area()),
+ inner.c.geometry.ST_Contains(WKT_PARAM)))\
+ .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
+ .limit(1)
if self.has_geometries():
sql = self._add_geometry_columns(sql, inner.c.geometry)
return row
-
async def lookup_area(self) -> Optional[SaRow]:
""" Lookup large areas for the current search.
"""
return _get_closest(address_row, other_row)
-
async def lookup_country_codes(self) -> List[str]:
""" Lookup the country for the current search.
"""
log().var_dump('Country codes', ccodes)
return ccodes
-
async def lookup_country(self, ccodes: List[str]) -> Optional[SaRow]:
""" Lookup the country for the current search.
"""
log().comment('Search for place nodes in country')
def _base_query() -> SaSelect:
- inner = \
- sa.select(t,
- t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
- .where(t.c.rank_search > 4)\
- .where(t.c.rank_search <= MAX_RANK_PARAM)\
- .where(t.c.indexed_status == 0)\
- .where(t.c.country_code.in_(ccodes))\
- .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
- .order_by(sa.desc(t.c.rank_search))\
- .limit(50)\
- .subquery('area')
+ inner = sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
+ .where(t.c.rank_search > 4)\
+ .where(t.c.rank_search <= MAX_RANK_PARAM)\
+ .where(t.c.indexed_status == 0)\
+ .where(t.c.country_code.in_(ccodes))\
+ .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
+ .order_by(sa.desc(t.c.rank_search))\
+ .limit(50)\
+ .subquery('area')
return _select_from_placex(inner, False)\
.where(sa.func.IsBelowReverseDistance(inner.c.distance, inner.c.rank_search))\
return address_row
-
async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
""" Look up a single coordinate. Returns the place information,
if a place was found near the coordinates or None otherwise.
"""
log().function('reverse_lookup', coord=coord, params=self.params)
-
self.bind_params['wkt'] = f'POINT({coord[0]} {coord[1]})'
row: Optional[SaRow] = None
class _PoiData(dbf.SearchData):
penalty = 0.0
qualifiers = dbf.WeightedCategories(category, [0.0] * len(category))
- countries=ccs
+ countries = ccs
return dbs.PoiSearch(_PoiData())
self.query = query
self.details = details
-
@property
def configured_for_country(self) -> bool:
""" Return true if the search details are configured to
allow countries in the result.
"""
return self.details.min_rank <= 4 and self.details.max_rank >= 4 \
- and self.details.layer_enabled(DataLayer.ADDRESS)
-
+ and self.details.layer_enabled(DataLayer.ADDRESS)
@property
def configured_for_postcode(self) -> bool:
allow postcodes in the result.
"""
return self.details.min_rank <= 5 and self.details.max_rank >= 11\
- and self.details.layer_enabled(DataLayer.ADDRESS)
-
+ and self.details.layer_enabled(DataLayer.ADDRESS)
@property
def configured_for_housenumbers(self) -> bool:
allow addresses in the result.
"""
return self.details.max_rank >= 30 \
- and self.details.layer_enabled(DataLayer.ADDRESS)
-
+ and self.details.layer_enabled(DataLayer.ADDRESS)
def build(self, assignment: TokenAssignment) -> Iterator[dbs.AbstractSearch]:
""" Yield all possible abstract searches for the given token assignment.
near_items = self.get_near_items(assignment)
if near_items is not None and not near_items:
- return # impossible compbination of near items and category parameter
+ return # impossible combination of near items and category parameter
if assignment.name is None:
if near_items and not sdata.postcodes:
search.penalty += assignment.penalty
yield search
-
def build_poi_search(self, sdata: dbf.SearchData) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search query for a simple category search.
This kind of search requires an additional geographic constraint.
and ((self.details.viewbox and self.details.bounded_viewbox) or self.details.near):
yield dbs.PoiSearch(sdata)
-
def build_special_search(self, sdata: dbf.SearchData,
address: List[TokenRange],
is_category: bool) -> Iterator[dbs.AbstractSearch]:
penalty += 0.2
yield dbs.PostcodeSearch(penalty, sdata)
-
def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[Token],
address: List[TokenRange]) -> Iterator[dbs.AbstractSearch]:
""" Build a simple address search for special entries where the
expected_count = sum(t.count for t in hnrs)
partials = {t.token: t.addr_count for trange in address
- for t in self.query.get_partials_list(trange)}
+ for t in self.query.get_partials_list(trange)}
if not partials:
# can happen when none of the partials is indexed
sdata.housenumbers = dbf.WeightedStrings([], [])
yield dbs.PlaceSearch(0.05, sdata, expected_count)
-
def build_name_search(self, sdata: dbf.SearchData,
name: TokenRange, address: List[TokenRange],
is_category: bool) -> Iterator[dbs.AbstractSearch]:
sdata.lookups = lookup
yield dbs.PlaceSearch(penalty + name_penalty, sdata, count)
-
- def yield_lookups(self, name: TokenRange, address: List[TokenRange])\
- -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]:
+ def yield_lookups(self, name: TokenRange, address: List[TokenRange]
+ ) -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]:
""" Yield all variants how the given name and address should best
be searched for. This takes into account how frequent the terms
are and tries to find a lookup that optimizes index use.
"""
- penalty = 0.0 # extra penalty
+ penalty = 0.0 # extra penalty
name_partials = {t.token: t for t in self.query.get_partials_list(name)}
addr_partials = [t for r in address for t in self.query.get_partials_list(r)]
fulls_count = sum(t.count for t in name_fulls)
if fulls_count < 50000 or addr_count < 30000:
- yield penalty,fulls_count / (2**len(addr_tokens)), \
+ yield penalty, fulls_count / (2**len(addr_tokens)), \
self.get_full_name_ranking(name_fulls, addr_partials,
fulls_count > 30000 / max(1, len(addr_tokens)))
if exp_count < 10000 and addr_count < 20000:
penalty += 0.35 * max(1 if name_fulls else 0.1,
5 - len(name_partials) - len(addr_tokens))
- yield penalty, exp_count,\
- self.get_name_address_ranking(list(name_partials.keys()), addr_partials)
-
+ yield penalty, exp_count, \
+ self.get_name_address_ranking(list(name_partials.keys()), addr_partials)
def get_name_address_ranking(self, name_tokens: List[int],
addr_partials: List[Token]) -> List[dbf.FieldLookup]:
return lookup
-
def get_full_name_ranking(self, name_fulls: List[Token], addr_partials: List[Token],
use_lookup: bool) -> List[dbf.FieldLookup]:
""" Create a ranking expression with full name terms and
return dbf.lookup_by_any_name([t.token for t in name_fulls],
addr_restrict_tokens, addr_lookup_tokens)
-
def get_name_ranking(self, trange: TokenRange,
db_field: str = 'name_vector') -> dbf.FieldRanking:
""" Create a ranking expression for a name term in the given range.
default = sum(t.penalty for t in name_partials) + 0.2
return dbf.FieldRanking(db_field, default, ranks)
-
def get_addr_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
""" Create a list of ranking expressions for an address term
for the given ranges.
heapq.heappush(todo, (0, trange.start, dbf.RankedTokens(0.0, [])))
ranks: List[dbf.RankedTokens] = []
- while todo: # pylint: disable=too-many-nested-blocks
+ while todo:
neglen, pos, rank = heapq.heappop(todo)
for tlist in self.query.nodes[pos].starting:
if tlist.ttype in (TokenType.PARTIAL, TokenType.WORD):
return dbf.FieldRanking('nameaddress_vector', default, ranks)
-
def get_search_data(self, assignment: TokenAssignment) -> Optional[dbf.SearchData]:
""" Collect the tokens for the non-name search fields in the
assignment.
return sdata
-
def get_country_tokens(self, trange: TokenRange) -> List[Token]:
""" Return the list of country tokens for the given range,
optionally filtered by the country list from the details
return tokens
-
def get_qualifier_tokens(self, trange: TokenRange) -> List[Token]:
""" Return the list of qualifier tokens for the given range,
optionally filtered by the qualifier list from the details
return tokens
-
def get_near_items(self, assignment: TokenAssignment) -> Optional[dbf.WeightedCategories]:
""" Collect tokens for near items search or use the categories
requested per parameter.
def __bool__(self) -> bool:
return bool(self.values)
-
def __iter__(self) -> Iterator[Tuple[str, float]]:
return iter(zip(self.values, self.penalties))
-
def get_penalty(self, value: str, default: float = 1000.0) -> float:
""" Get the penalty for the given value. Returns the given default
if the value does not exist.
def __bool__(self) -> bool:
return bool(self.values)
-
def __iter__(self) -> Iterator[Tuple[Tuple[str, str], float]]:
return iter(zip(self.values, self.penalties))
-
def get_penalty(self, value: Tuple[str, str], default: float = 1000.0) -> float:
""" Get the penalty for the given value. Returns the given default
if the value does not exist.
pass
return default
-
def sql_restrict(self, table: SaFromClause) -> SaExpression:
""" Return an SQLAlcheny expression that restricts the
class and type columns of the given table to the values
ranking.penalty -= min_penalty
return min_penalty
-
def sql_penalty(self, table: SaFromClause) -> SaColumn:
""" Create an SQL expression for the rankings.
"""
qualifiers: WeightedCategories = WeightedCategories([], [])
-
def set_strings(self, field: str, tokens: List[Token]) -> None:
""" Set on of the WeightedStrings properties from the given
token list. Adapt the global penalty, so that the
setattr(self, field, wstrs)
-
def set_qualifiers(self, tokens: List[Token]) -> None:
""" Set the qulaifier field from the given tokens.
"""
self.qualifiers = WeightedCategories(list(categories.keys()),
list(categories.values()))
-
def set_ranking(self, rankings: List[FieldRanking]) -> None:
""" Set the list of rankings and normalize the ranking.
"""
from ..typing import SaFromClause
from ..sql.sqlalchemy_types import IntArray
-# pylint: disable=consider-using-f-string
LookupType = sa.sql.expression.FunctionElement[Any]
+
class LookupAll(LookupType):
""" Find all entries in search_name table that contain all of
a given list of tokens using an index for the search.
@compiles(LookupAll, 'sqlite')
def _sqlite_lookup_all(element: LookupAll,
- compiler: 'sa.Compiled', **kw: Any) -> str:
+ compiler: 'sa.Compiled', **kw: Any) -> str:
place, col, colname, tokens = list(element.clauses)
return "(%s IN (SELECT CAST(value as bigint) FROM"\
" (SELECT array_intersect_fuzzy(places) as p FROM"\
" ORDER BY length(places)) as x) as u,"\
" json_each('[' || u.p || ']'))"\
" AND array_contains(%s, %s))"\
- % (compiler.process(place, **kw),
- compiler.process(tokens, **kw),
- compiler.process(colname, **kw),
- compiler.process(col, **kw),
- compiler.process(tokens, **kw)
- )
-
+ % (compiler.process(place, **kw),
+ compiler.process(tokens, **kw),
+ compiler.process(colname, **kw),
+ compiler.process(col, **kw),
+ compiler.process(tokens, **kw))
class LookupAny(LookupType):
super().__init__(table.c.place_id, getattr(table.c, column), column,
sa.type_coerce(tokens, IntArray))
+
@compiles(LookupAny)
def _default_lookup_any(element: LookupAny,
compiler: 'sa.Compiled', **kw: Any) -> str:
return "(%s && %s)" % (compiler.process(col, **kw),
compiler.process(tokens, **kw))
+
@compiles(LookupAny, 'sqlite')
def _sqlite_lookup_any(element: LookupAny,
- compiler: 'sa.Compiled', **kw: Any) -> str:
+ compiler: 'sa.Compiled', **kw: Any) -> str:
place, _, colname, tokens = list(element.clauses)
return "%s IN (SELECT CAST(value as bigint) FROM"\
" (SELECT array_union(places) as p FROM reverse_search_name"\
compiler.process(colname, **kw))
-
class Restrict(LookupType):
""" Find all entries that contain all of the given tokens.
Do not use an index for the search.
@compiles(Restrict)
def _default_restrict(element: Restrict,
- compiler: 'sa.Compiled', **kw: Any) -> str:
+ compiler: 'sa.Compiled', **kw: Any) -> str:
arg1, arg2 = list(element.clauses)
return "(coalesce(null, %s) @> %s)" % (compiler.process(arg1, **kw),
compiler.process(arg2, **kw))
+
@compiles(Restrict, 'sqlite')
def _sqlite_restrict(element: Restrict,
- compiler: 'sa.Compiled', **kw: Any) -> str:
+ compiler: 'sa.Compiled', **kw: Any) -> str:
return "array_contains(%s)" % compiler.process(element.clauses, **kw)
from .. import results as nres
from .db_search_fields import SearchData, WeightedCategories
-#pylint: disable=singleton-comparison,not-callable
-#pylint: disable=too-many-branches,too-many-arguments,too-many-locals,too-many-statements
def no_index(expr: SaColumn) -> SaColumn:
""" Wrap the given expression, so that the query planner will
refrain from using the expression for index lookup.
"""
- return sa.func.coalesce(sa.null(), expr) # pylint: disable=not-callable
+ return sa.func.coalesce(sa.null(), expr)
def _details_to_bind_params(details: SearchDetails) -> Dict[str, Any]:
if details.viewbox is not None and details.bounded_viewbox:
sql = sql.where(t.c.geometry.intersects(VIEWBOX_PARAM,
use_index=not avoid_index and
- details.viewbox.area < 0.2))
+ details.viewbox.area < 0.2))
return sql
as rows in the column 'nr'.
"""
vtab = sa.func.JsonArrayEach(sa.type_coerce(inp, sa.JSON))\
- .table_valued(sa.column('value', type_=sa.JSON))
+ .table_valued(sa.column('value', type_=sa.JSON))
return sa.select(sa.cast(sa.cast(vtab.c.value, sa.Text), sa.Integer).label('nr')).subquery()
self.search = search
self.categories = categories
-
async def lookup(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Find results for the search in the database.
else:
min_rank = 26
max_rank = 30
- base = nres.SearchResults(r for r in base if r.source_table == nres.SourceTable.PLACEX
- and r.accuracy <= max_accuracy
- and r.bbox and r.bbox.area < 20
- and r.rank_address >= min_rank
- and r.rank_address <= max_rank)
+ base = nres.SearchResults(r for r in base
+ if (r.source_table == nres.SourceTable.PLACEX
+ and r.accuracy <= max_accuracy
+ and r.bbox and r.bbox.area < 20
+ and r.rank_address >= min_rank
+ and r.rank_address <= max_rank))
if base:
baseids = [b.place_id for b in base[:5] if b.place_id]
return results
-
async def lookup_category(self, results: nres.SearchResults,
conn: SearchConnection, ids: List[int],
category: Tuple[str, str], penalty: float,
.join(tgeom,
table.c.centroid.ST_CoveredBy(
sa.case((sa.and_(tgeom.c.rank_address > 9,
- tgeom.c.geometry.is_area()),
+ tgeom.c.geometry.is_area()),
tgeom.c.geometry),
- else_ = tgeom.c.centroid.ST_Expand(0.05))))
+ else_=tgeom.c.centroid.ST_Expand(0.05))))
inner = sql.where(tgeom.c.place_id.in_(ids))\
.group_by(table.c.place_id).subquery()
results.append(result)
-
class PoiSearch(AbstractSearch):
""" Category search in a geographic area.
"""
self.qualifiers = sdata.qualifiers
self.countries = sdata.countries
-
async def lookup(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Find results for the search in the database.
def _base_query() -> SaSelect:
return _select_placex(t) \
.add_columns((-t.c.centroid.ST_Distance(NEAR_PARAM))
- .label('importance'))\
+ .label('importance'))\
.where(t.c.linked_place_id == None) \
.where(t.c.geometry.within_distance(NEAR_PARAM, NEAR_RADIUS_PARAM)) \
.order_by(t.c.centroid.ST_Distance(NEAR_PARAM)) \
classtype = self.qualifiers.values
if len(classtype) == 1:
cclass, ctype = classtype[0]
- sql: SaLambdaSelect = sa.lambda_stmt(lambda: _base_query()
- .where(t.c.class_ == cclass)
- .where(t.c.type == ctype))
+ sql: SaLambdaSelect = sa.lambda_stmt(
+ lambda: _base_query().where(t.c.class_ == cclass)
+ .where(t.c.type == ctype))
else:
sql = _base_query().where(sa.or_(*(sa.and_(t.c.class_ == cls, t.c.type == typ)
for cls, typ in classtype)))
super().__init__(sdata.penalty)
self.countries = sdata.countries
-
async def lookup(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Find results for the search in the database.
ccodes = self.countries.values
sql = _select_placex(t)\
- .add_columns(t.c.importance)\
- .where(t.c.country_code.in_(ccodes))\
- .where(t.c.rank_address == 4)
+ .add_columns(t.c.importance)\
+ .where(t.c.country_code.in_(ccodes))\
+ .where(t.c.rank_address == 4)
if details.geometry_output:
sql = _add_geometry_columns(sql, t.c.geometry, details)
return results
-
async def lookup_in_country_table(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Look up the country in the fallback country tables.
sql = sa.select(tgrid.c.country_code,
tgrid.c.geometry.ST_Centroid().ST_Collect().ST_Centroid()
- .label('centroid'),
+ .label('centroid'),
tgrid.c.geometry.ST_Collect().ST_Expand(0).label('bbox'))\
.where(tgrid.c.country_code.in_(self.countries.values))\
.group_by(tgrid.c.country_code)
return results
-
class PostcodeSearch(AbstractSearch):
""" Search for a postcode.
"""
self.lookups = sdata.lookups
self.rankings = sdata.rankings
-
async def lookup(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Find results for the search in the database.
tsearch = conn.t.search_name
sql = sql.where(tsearch.c.place_id == t.c.parent_place_id)\
.where((tsearch.c.name_vector + tsearch.c.nameaddress_vector)
- .contains(sa.type_coerce(self.lookups[0].tokens,
- IntArray)))
+ .contains(sa.type_coerce(self.lookups[0].tokens,
+ IntArray)))
for ranking in self.rankings:
penalty += ranking.sql_penalty(conn.t.search_name)
penalty += sa.case(*((t.c.postcode == v, p) for v, p in self.postcodes),
- else_=1.0)
-
+ else_=1.0)
sql = sql.add_columns(penalty.label('accuracy'))
sql = sql.order_by('accuracy').limit(LIMIT_PARAM)
results = nres.SearchResults()
for row in await conn.execute(sql, _details_to_bind_params(details)):
p = conn.t.placex
- placex_sql = _select_placex(p).add_columns(p.c.importance)\
- .where(sa.text("""class = 'boundary'
- AND type = 'postal_code'
- AND osm_type = 'R'"""))\
- .where(p.c.country_code == row.country_code)\
- .where(p.c.postcode == row.postcode)\
- .limit(1)
+ placex_sql = _select_placex(p)\
+ .add_columns(p.c.importance)\
+ .where(sa.text("""class = 'boundary'
+ AND type = 'postal_code'
+ AND osm_type = 'R'"""))\
+ .where(p.c.country_code == row.country_code)\
+ .where(p.c.postcode == row.postcode)\
+ .limit(1)
if details.geometry_output:
placex_sql = _add_geometry_columns(placex_sql, p.c.geometry, details)
return results
-
class PlaceSearch(AbstractSearch):
""" Generic search for an address or named place.
"""
self.rankings = sdata.rankings
self.expected_count = expected_count
-
def _inner_search_name_cte(self, conn: SearchConnection,
details: SearchDetails) -> 'sa.CTE':
""" Create a subquery that preselects the rows in the search_name
NEAR_RADIUS_PARAM))
else:
sql = sql.where(t.c.centroid
- .ST_Distance(NEAR_PARAM) < NEAR_RADIUS_PARAM)
+ .ST_Distance(NEAR_PARAM) < NEAR_RADIUS_PARAM)
if self.housenumbers:
sql = sql.where(t.c.address_rank.between(16, 30))
and (details.near is None or details.near_radius is not None)\
and not self.qualifiers:
sql = sql.add_columns(sa.func.first_value(inner.c.penalty - inner.c.importance)
- .over(order_by=inner.c.penalty - inner.c.importance)
- .label('min_penalty'))
+ .over(order_by=inner.c.penalty - inner.c.importance)
+ .label('min_penalty'))
inner = sql.subquery()
return sql.cte('searches')
-
async def lookup(self, conn: SearchConnection,
details: SearchDetails) -> nres.SearchResults:
""" Find results for the search in the database.
pcs = self.postcodes.values
pc_near = sa.select(sa.func.min(tpc.c.geometry.ST_Distance(t.c.centroid)))\
- .where(tpc.c.postcode.in_(pcs))\
- .scalar_subquery()
+ .where(tpc.c.postcode.in_(pcs))\
+ .scalar_subquery()
penalty += sa.case((t.c.postcode.in_(pcs), 0.0),
else_=sa.func.coalesce(pc_near, cast(SaColumn, 2.0)))
if details.near is not None:
sql = sql.add_columns((-tsearch.c.centroid.ST_Distance(NEAR_PARAM))
- .label('importance'))
+ .label('importance'))
sql = sql.order_by(sa.desc(sa.text('importance')))
else:
sql = sql.order_by(penalty - tsearch.c.importance)
sql = sql.add_columns(tsearch.c.importance)
-
sql = sql.add_columns(penalty.label('accuracy'))\
.order_by(sa.text('accuracy'))
tiger_sql = sa.case((inner.c.country_code == 'us',
_make_interpolation_subquery(conn.t.tiger, inner,
numerals, details)
- ), else_=None)
+ ), else_=None)
else:
interpol_sql = sa.null()
tiger_sql = sa.null()
if (not details.excluded or result.place_id not in details.excluded)\
and (not self.qualifiers or result.category in self.qualifiers.values)\
and result.rank_address >= details.min_rank:
- result.accuracy += 1.0 # penalty for missing housenumber
+ result.accuracy += 1.0 # penalty for missing housenumber
results.append(result)
else:
results.append(result)
from .query_analyzer_factory import make_query_analyzer, AbstractQueryAnalyzer
from .query import Phrase, QueryStruct
+
class ForwardGeocoder:
""" Main class responsible for place search.
"""
self.timeout = dt.timedelta(seconds=timeout or 1000000)
self.query_analyzer: Optional[AbstractQueryAnalyzer] = None
-
@property
def limit(self) -> int:
""" Return the configured maximum number of search results.
"""
return self.params.max_results
-
async def build_searches(self,
phrases: List[Phrase]) -> Tuple[QueryStruct, List[AbstractSearch]]:
""" Analyse the query and return the tokenized query and list of
return query, searches
-
async def execute_searches(self, query: QueryStruct,
searches: List[AbstractSearch]) -> SearchResults:
""" Run the abstract searches against the database until a result
return SearchResults(results.values())
-
def pre_filter_results(self, results: SearchResults) -> SearchResults:
""" Remove results that are significantly worse than the
best match.
return results
-
def sort_and_cut_results(self, results: SearchResults) -> SearchResults:
""" Remove badly matching results, sort by ranking and
limit to the configured number of results.
min_rank = results[0].rank_search
min_ranking = results[0].ranking
results = SearchResults(r for r in results
- if r.ranking + 0.03 * (r.rank_search - min_rank)
- < min_ranking + 0.5)
+ if (r.ranking + 0.03 * (r.rank_search - min_rank)
+ < min_ranking + 0.5))
results = SearchResults(results[:self.limit])
return results
-
def rerank_by_query(self, query: QueryStruct, results: SearchResults) -> None:
""" Adjust the accuracy of the localized result according to how well
they match the original query.
"""
assert self.query_analyzer is not None
qwords = [word for phrase in query.source
- for word in re.split('[, ]+', phrase.text) if word]
+ for word in re.split('[, ]+', phrase.text) if word]
if not qwords:
return
distance *= 2
result.accuracy += distance * 0.4 / sum(len(w) for w in qwords)
-
async def lookup_pois(self, categories: List[Tuple[str, str]],
phrases: List[Phrase]) -> SearchResults:
""" Look up places by category. If phrase is given, a place search
return results
-
async def lookup(self, phrases: List[Phrase]) -> SearchResults:
""" Look up a single free-text query.
"""
return results
-# pylint: disable=invalid-name,too-many-locals
def _dump_searches(searches: List[AbstractSearch], query: QueryStruct,
start: int = 0) -> Iterator[Optional[List[Any]]]:
yield ['Penalty', 'Lookups', 'Housenr', 'Postcode', 'Countries',
ranks = ranks[:100] + '...'
return f"{f.column}({ranks},def={f.default:.3g})"
- def fmt_lookup(l: Any) -> str:
- if not l:
+ def fmt_lookup(lk: Any) -> str:
+ if not lk:
return ''
- return f"{l.lookup_type}({l.column}{tk(l.tokens)})"
-
+ return f"{lk.lookup_type}({lk.column}{tk(lk.tokens)})"
def fmt_cstr(c: Any) -> str:
if not c:
QueryParts = List[QueryPart]
WordDict = Dict[str, List[qmod.TokenRange]]
+
def yield_words(terms: List[QueryPart], start: int) -> Iterator[Tuple[str, qmod.TokenRange]]:
""" Return all combinations of words in the terms list after the
given position.
assert self.info
return self.info.get('class', ''), self.info.get('type', '')
-
def rematch(self, norm: str) -> None:
""" Check how well the token matches the given normalized string
and add a penalty, if necessary.
distance += abs((ato-afrom) - (bto-bfrom))
self.penalty += (distance/len(self.lookup_word))
-
@staticmethod
def from_db_row(row: SaRow) -> 'ICUToken':
""" Create a ICUToken from the row of the word table.
addr_count=max(1, addr_count))
-
class ICUQueryAnalyzer(AbstractQueryAnalyzer):
""" Converter for query strings into a tokenized query
using the tokens created by a ICU tokenizer.
"""
-
def __init__(self, conn: SearchConnection) -> None:
self.conn = conn
-
async def setup(self) -> None:
""" Set up static data structures needed for the analysis.
"""
sa.Column('word', sa.Text),
sa.Column('info', Json))
-
async def analyze_query(self, phrases: List[qmod.Phrase]) -> qmod.QueryStruct:
""" Analyze the given list of phrases and return the
tokenized query.
return query
-
def normalize_text(self, text: str) -> str:
""" Bring the given text into a normalized form. That is the
standardized form search will work with. All information removed
"""
return cast(str, self.normalizer.transliterate(text))
-
def split_query(self, query: qmod.QueryStruct) -> Tuple[QueryParts, WordDict]:
""" Transliterate the phrases and split them into tokens.
return parts, words
-
async def lookup_in_db(self, words: List[str]) -> 'sa.Result[Any]':
""" Return the token information from the database for the
given word tokens.
t = self.conn.t.meta.tables['word']
return await self.conn.execute(t.select().where(t.c.word_token.in_(words)))
-
def add_extra_tokens(self, query: qmod.QueryStruct, parts: QueryParts) -> None:
""" Add tokens to query that are not saved in the database.
"""
count=1, addr_count=1, lookup_word=part.token,
word_token=part.token, info=None))
-
def rerank_tokens(self, query: qmod.QueryStruct, parts: QueryParts) -> None:
""" Add penalties to tokens that depend on presence of other token.
"""
and (repl.ttype != qmod.TokenType.HOUSENUMBER
or len(tlist.tokens[0].lookup_word) > 4):
repl.add_penalty(0.39)
- elif tlist.ttype == qmod.TokenType.HOUSENUMBER \
- and len(tlist.tokens[0].lookup_word) <= 3:
+ elif (tlist.ttype == qmod.TokenType.HOUSENUMBER
+ and len(tlist.tokens[0].lookup_word) <= 3):
if any(c.isdigit() for c in tlist.tokens[0].lookup_word):
for repl in node.starting:
if repl.end == tlist.end and repl.ttype != qmod.TokenType.HOUSENUMBER:
import dataclasses
import enum
+
class BreakType(enum.Enum):
""" Type of break between tokens.
"""
addr_count: int
lookup_word: str
-
@abstractmethod
def get_category(self) -> Tuple[str, str]:
""" Return the category restriction for qualifier terms and
category objects.
"""
+
@dataclasses.dataclass
class TokenRange:
""" Indexes of query nodes over which a token spans.
def __lt__(self, other: 'TokenRange') -> bool:
return self.end <= other.start
-
def __le__(self, other: 'TokenRange') -> bool:
return NotImplemented
-
def __gt__(self, other: 'TokenRange') -> bool:
return self.start >= other.end
-
def __ge__(self, other: 'TokenRange') -> bool:
return NotImplemented
-
def replace_start(self, new_start: int) -> 'TokenRange':
""" Return a new token range with the new start.
"""
return TokenRange(new_start, self.end)
-
def replace_end(self, new_end: int) -> 'TokenRange':
""" Return a new token range with the new end.
"""
return TokenRange(self.start, new_end)
-
def split(self, index: int) -> Tuple['TokenRange', 'TokenRange']:
""" Split the span into two spans at the given index.
The index must be within the span.
ttype: TokenType
tokens: List[Token]
-
def add_penalty(self, penalty: float) -> None:
""" Add the given penalty to all tokens in the list.
"""
"""
return any(tl.end == end and tl.ttype in ttypes for tl in self.starting)
-
def get_tokens(self, end: int, ttype: TokenType) -> Optional[List[Token]]:
""" Get the list of tokens of the given type starting at this node
and ending at the node 'end'. Returns 'None' if no such
self.nodes: List[QueryNode] = \
[QueryNode(BreakType.START, source[0].ptype if source else PhraseType.NONE)]
-
def num_token_slots(self) -> int:
""" Return the length of the query in vertice steps.
"""
return len(self.nodes) - 1
-
def add_node(self, btype: BreakType, ptype: PhraseType) -> None:
""" Append a new break node with the given break type.
The phrase type denotes the type for any tokens starting
"""
self.nodes.append(QueryNode(btype, ptype))
-
def add_token(self, trange: TokenRange, ttype: TokenType, token: Token) -> None:
""" Add a token to the query. 'start' and 'end' are the indexes of the
nodes from which to which the token spans. The indexes must exist
"""
snode = self.nodes[trange.start]
full_phrase = snode.btype in (BreakType.START, BreakType.PHRASE)\
- and self.nodes[trange.end].btype in (BreakType.PHRASE, BreakType.END)
+ and self.nodes[trange.end].btype in (BreakType.PHRASE, BreakType.END)
if snode.ptype.compatible_with(ttype, full_phrase):
tlist = snode.get_tokens(trange.end, ttype)
if tlist is None:
else:
tlist.append(token)
-
def get_tokens(self, trange: TokenRange, ttype: TokenType) -> List[Token]:
""" Get the list of tokens of a given type, spanning the given
nodes. The nodes must exist. If no tokens exist, an
"""
return self.nodes[trange.start].get_tokens(trange.end, ttype) or []
-
def get_partials_list(self, trange: TokenRange) -> List[Token]:
""" Create a list of partial tokens between the given nodes.
The list is composed of the first token of type PARTIAL
assumed to exist.
"""
return [next(iter(self.get_tokens(TokenRange(i, i+1), TokenType.PARTIAL)))
- for i in range(trange.start, trange.end)]
-
+ for i in range(trange.start, trange.end)]
def iter_token_lists(self) -> Iterator[Tuple[int, QueryNode, TokenList]]:
""" Iterator over all token lists in the query.
for tlist in node.starting:
yield i, node, tlist
-
def find_lookup_word_by_id(self, token: int) -> str:
""" Find the first token with the given token ID and return
its lookup word. Returns 'None' if no such token exists.
if TYPE_CHECKING:
from .query import Phrase, QueryStruct
+
class AbstractQueryAnalyzer(ABC):
""" Class for analysing incoming queries.
""" Analyze the given phrases and return the tokenized query.
"""
-
@abstractmethod
def normalize_text(self, text: str) -> str:
""" Bring the given text into a normalized form. That is the
"""
-
async def make_query_analyzer(conn: SearchConnection) -> AbstractQueryAnalyzer:
""" Create a query analyzer for the tokenizer used by the database.
"""
from ..logging import log
from . import query as qmod
-# pylint: disable=too-many-return-statements,too-many-branches
@dataclasses.dataclass
class TypedRange:
TypedRangeSeq = List[TypedRange]
+
@dataclasses.dataclass
-class TokenAssignment: # pylint: disable=too-many-instance-attributes
+class TokenAssignment:
""" Representation of a possible assignment of token types
to the tokens in a tokenized query.
"""
near_item: Optional[qmod.TokenRange] = None
qualifier: Optional[qmod.TokenRange] = None
-
@staticmethod
def from_ranges(ranges: TypedRangeSeq) -> 'TokenAssignment':
""" Create a new token assignment from a sequence of typed spans.
self.direction = direction
self.penalty = penalty
-
def __str__(self) -> str:
seq = ''.join(f'[{r.trange.start} - {r.trange.end}: {r.ttype.name}]' for r in self.seq)
return f'{seq} (dir: {self.direction}, penalty: {self.penalty})'
-
@property
def end_pos(self) -> int:
""" Return the index of the global end of the current sequence.
"""
return self.seq[-1].trange.end if self.seq else 0
-
def has_types(self, *ttypes: qmod.TokenType) -> bool:
""" Check if the current sequence contains any typed ranges of
the given types.
"""
return any(s.ttype in ttypes for s in self.seq)
-
def is_final(self) -> bool:
""" Return true when the sequence cannot be extended by any
form of token anymore.
"""
# Country and category must be the final term for left-to-right
return len(self.seq) > 1 and \
- self.seq[-1].ttype in (qmod.TokenType.COUNTRY, qmod.TokenType.NEAR_ITEM)
-
+ self.seq[-1].ttype in (qmod.TokenType.COUNTRY, qmod.TokenType.NEAR_ITEM)
def appendable(self, ttype: qmod.TokenType) -> Optional[int]:
""" Check if the give token type is appendable to the existing sequence.
return None
if len(self.seq) > 2 \
or self.has_types(qmod.TokenType.POSTCODE, qmod.TokenType.COUNTRY):
- return None # direction left-to-right: housenumber must come before anything
- elif self.direction == -1 \
- or self.has_types(qmod.TokenType.POSTCODE, qmod.TokenType.COUNTRY):
- return -1 # force direction right-to-left if after other terms
+ return None # direction left-to-right: housenumber must come before anything
+ elif (self.direction == -1
+ or self.has_types(qmod.TokenType.POSTCODE, qmod.TokenType.COUNTRY)):
+ return -1 # force direction right-to-left if after other terms
return self.direction
return None
-
def advance(self, ttype: qmod.TokenType, end_pos: int,
btype: qmod.BreakType) -> Optional['_TokenSequence']:
""" Return a new token sequence state with the given token type
return _TokenSequence(newseq, newdir, self.penalty + new_penalty)
-
def _adapt_penalty_from_priors(self, priors: int, new_dir: int) -> bool:
if priors >= 2:
if self.direction == 0:
return True
-
def recheck_sequence(self) -> bool:
""" Check that the sequence is a fully valid token assignment
and adapt direction and penalties further if necessary.
return True
-
def _get_assignments_postcode(self, base: TokenAssignment,
- query_len: int) -> Iterator[TokenAssignment]:
+ query_len: int) -> Iterator[TokenAssignment]:
""" Yield possible assignments of Postcode searches with an
address component.
"""
# <address>,<postcode> should give preference to address search
if base.postcode.start == 0:
penalty = self.penalty
- self.direction = -1 # name searches are only possible backwards
+ self.direction = -1 # name searches are only possible backwards
else:
penalty = self.penalty + 0.1
- self.direction = 1 # name searches are only possible forwards
+ self.direction = 1 # name searches are only possible forwards
yield dataclasses.replace(base, penalty=penalty)
-
def _get_assignments_address_forward(self, base: TokenAssignment,
query: qmod.QueryStruct) -> Iterator[TokenAssignment]:
""" Yield possible assignments of address searches with
yield dataclasses.replace(base, name=name, address=[addr] + base.address[1:],
penalty=penalty + PENALTY_TOKENCHANGE[query.nodes[i].btype])
-
def _get_assignments_address_backward(self, base: TokenAssignment,
query: qmod.QueryStruct) -> Iterator[TokenAssignment]:
""" Yield possible assignments of address searches with
yield dataclasses.replace(base, name=name, address=base.address[:-1] + [addr],
penalty=penalty + PENALTY_TOKENCHANGE[query.nodes[i].btype])
-
def get_assignments(self, query: qmod.QueryStruct) -> Iterator[TokenAssignment]:
""" Yield possible assignments for the current sequence.
from ..result_formatting import FormatDispatcher
from .content_types import CONTENT_TEXT
+
class ASGIAdaptor(abc.ABC):
""" Adapter class for the different ASGI frameworks.
Wraps functionality over concrete requests and responses.
"""
content_type: str = CONTENT_TEXT
-
@abc.abstractmethod
def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
""" Return an input parameter as a string. If the parameter was
not provided, return the 'default' value.
"""
-
@abc.abstractmethod
def error(self, msg: str, status: int = 400) -> Exception:
""" Construct an appropriate exception from the given error message.
The exception must result in a HTTP error with the given status.
"""
-
@abc.abstractmethod
def create_response(self, status: int, output: str, num_results: int) -> Any:
""" Create a response from the given parameters. The result will
body of the response to 'output'.
"""
-
@abc.abstractmethod
def base_uri(self) -> str:
""" Return the URI of the original request.
"""
-
@abc.abstractmethod
def config(self) -> Configuration:
""" Return the current configuration object.
"""
-
@abc.abstractmethod
def formatting(self) -> FormatDispatcher:
""" Return the formatting object to use.
"""
-
def get_int(self, name: str, default: Optional[int] = None) -> int:
""" Return an input parameter as an int. Raises an exception if
the parameter is given but not in an integer format.
return intval
-
def get_float(self, name: str, default: Optional[float] = None) -> float:
""" Return an input parameter as a flaoting-point number. Raises an
exception if the parameter is given but not in an float format.
return fval
-
def get_bool(self, name: str, default: Optional[bool] = None) -> bool:
""" Return an input parameter as bool. Only '0' is accepted as
an input for 'false' all other inputs will be interpreted as 'true'.
return value != '0'
-
def raise_error(self, msg: str, status: int = 400) -> NoReturn:
""" Raise an exception resulting in the given HTTP status and
message. The message will be formatted according to the
from ... import logging as loglib
from ..asgi_adaptor import ASGIAdaptor, EndpointFunc
+
class HTTPNominatimError(Exception):
""" A special exception class for errors raised during processing.
"""
self.content_type = content_type
-async def nominatim_error_handler(req: Request, resp: Response, #pylint: disable=unused-argument
+async def nominatim_error_handler(req: Request, resp: Response,
exception: HTTPNominatimError,
_: Any) -> None:
""" Special error handler that passes message and content type as
resp.content_type = exception.content_type
-async def timeout_error_handler(req: Request, resp: Response, #pylint: disable=unused-argument
- exception: TimeoutError, #pylint: disable=unused-argument
+async def timeout_error_handler(req: Request, resp: Response,
+ exception: TimeoutError,
_: Any) -> None:
""" Special error handler that passes message and content type as
per exception info.
self._config = config
self._formatter = formatter
-
def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
return self.request.get_param(name, default=default)
-
def get_header(self, name: str, default: Optional[str] = None) -> Optional[str]:
return self.request.get_header(name, default=default)
-
def error(self, msg: str, status: int = 400) -> HTTPNominatimError:
return HTTPNominatimError(msg, status, self.content_type)
-
def create_response(self, status: int, output: str, num_results: int) -> None:
self.response.context.num_results = num_results
self.response.status = status
self.response.text = output
self.response.content_type = self.content_type
-
def base_uri(self) -> str:
return self.request.forwarded_prefix
self.api = api
self.formatter = formatter
-
async def on_get(self, req: Request, resp: Response) -> None:
""" Implementation of the endpoint.
"""
"""
def __init__(self, file_name: str):
- self.fd = open(file_name, 'a', buffering=1, encoding='utf8') # pylint: disable=R1732
-
+ self.fd = open(file_name, 'a', buffering=1, encoding='utf8')
async def process_request(self, req: Request, _: Response) -> None:
""" Callback before the request starts timing.
"""
req.context.start = dt.datetime.now(tz=dt.timezone.utc)
-
async def process_response(self, req: Request, resp: Response,
resource: Optional[EndpointWrapper],
req_succeeded: bool) -> None:
writes logs for successful requests for search, reverse and lookup.
"""
if not req_succeeded or resource is None or resp.status != 200\
- or resource.name not in ('reverse', 'search', 'lookup', 'details'):
+ or resource.name not in ('reverse', 'search', 'lookup', 'details'):
return
finish = dt.datetime.now(tz=dt.timezone.utc)
app.add_error_handler(HTTPNominatimError, nominatim_error_handler)
app.add_error_handler(TimeoutError, timeout_error_handler)
# different from TimeoutError in Python <= 3.10
- app.add_error_handler(asyncio.TimeoutError, timeout_error_handler) # type: ignore[arg-type]
+ app.add_error_handler(asyncio.TimeoutError, timeout_error_handler) # type: ignore[arg-type]
legacy_urls = api.config.get_bool('SERVE_LEGACY_URLS')
formatter = load_format_dispatcher('v1', project_dir)
from ..asgi_adaptor import ASGIAdaptor, EndpointFunc
from ... import logging as loglib
+
class ParamWrapper(ASGIAdaptor):
""" Adaptor class for server glue to Starlette framework.
"""
def __init__(self, request: Request) -> None:
self.request = request
-
def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
return self.request.query_params.get(name, default=default)
-
def get_header(self, name: str, default: Optional[str] = None) -> Optional[str]:
return self.request.headers.get(name, default)
-
def error(self, msg: str, status: int = 400) -> HTTPException:
return HTTPException(status, detail=msg,
headers={'content-type': self.content_type})
-
def create_response(self, status: int, output: str, num_results: int) -> Response:
self.request.state.num_results = num_results
return Response(output, status_code=status, media_type=self.content_type)
-
def base_uri(self) -> str:
scheme = self.request.url.scheme
host = self.request.url.hostname
return f"{scheme}://{host}{root}"
-
def config(self) -> Configuration:
return cast(Configuration, self.request.app.state.API.config)
-
def formatting(self) -> FormatDispatcher:
return cast(FormatDispatcher, self.request.app.state.API.formatter)
def __init__(self, app: Starlette, file_name: str = ''):
super().__init__(app)
- self.fd = open(file_name, 'a', buffering=1, encoding='utf8') # pylint: disable=R1732
+ self.fd = open(file_name, 'a', buffering=1, encoding='utf8')
async def dispatch(self, request: Request,
call_next: RequestResponseEndpoint) -> Response:
return response
-async def timeout_error(request: Request, #pylint: disable=unused-argument
+async def timeout_error(request: Request,
_: Exception) -> Response:
""" Error handler for query timeouts.
"""
"""
Import the base library to use with asynchronous SQLAlchemy.
"""
-# pylint: disable=invalid-name, ungrouped-imports, unused-import
-
from typing import Any
+# flake8: noqa
+
try:
import sqlalchemy.dialects.postgresql.psycopg
import psycopg
from ..typing import SaColumn
-# pylint: disable=all
class PlacexGeometryReverseLookuppolygon(sa.sql.functions.GenericFunction[Any]):
""" Check for conditions that allow partial index use on
f" AND {table}.name is not null"\
f" AND {table}.linked_place_id is null"\
f" AND {table}.osm_type = 'N'" + \
- " AND ST_Buffer(%s, reverse_place_diameter(%s)) && %s)" % \
- tuple(map(lambda c: compiler.process(c, **kw), element.clauses))
+ " AND ST_Buffer(%s, reverse_place_diameter(%s)) && %s)" \
+ % tuple(map(lambda c: compiler.process(c, **kw), element.clauses))
@compiles(IntersectsReverseDistance, 'sqlite')
geom1, rank, geom2 = list(element.clauses)
table = element.tablename
- return (f"({table}.rank_address between 4 and 25"\
- f" AND {table}.type != 'postcode'"\
- f" AND {table}.name is not null"\
- f" AND {table}.linked_place_id is null"\
- f" AND {table}.osm_type = 'N'"\
- " AND MbrIntersects(%s, ST_Expand(%s, 14.0 * exp(-0.2 * %s) - 0.03))"\
- f" AND {table}.place_id IN"\
- " (SELECT place_id FROM placex_place_node_areas"\
- " WHERE ROWID IN (SELECT ROWID FROM SpatialIndex"\
- " WHERE f_table_name = 'placex_place_node_areas'"\
- " AND search_frame = %s)))") % (
+ return (f"({table}.rank_address between 4 and 25"
+ f" AND {table}.type != 'postcode'"
+ f" AND {table}.name is not null"
+ f" AND {table}.linked_place_id is null"
+ f" AND {table}.osm_type = 'N'"
+ " AND MbrIntersects(%s, ST_Expand(%s, 14.0 * exp(-0.2 * %s) - 0.03))"
+ f" AND {table}.place_id IN"
+ " (SELECT place_id FROM placex_place_node_areas"
+ " WHERE ROWID IN (SELECT ROWID FROM SpatialIndex"
+ " WHERE f_table_name = 'placex_place_node_areas'"
+ " AND search_frame = %s)))") % (
compiler.process(geom1, **kw),
compiler.process(geom2, **kw),
compiler.process(rank, **kw),
name = 'CrosscheckNames'
inherit_cache = True
+
@compiles(CrosscheckNames)
def compile_crosscheck_names(element: CrosscheckNames,
compiler: 'sa.Compiled', **kw: Any) -> str:
return "json_each(%s)" % compiler.process(element.clauses, **kw)
-
class Greatest(sa.sql.functions.GenericFunction[Any]):
""" Function to compute maximum of all its input parameters.
"""
return "max(%s)" % compiler.process(element.clauses, **kw)
-
class RegexpWord(sa.sql.functions.GenericFunction[Any]):
""" Check if a full word is in a given string.
"""
@compiles(RegexpWord, 'postgresql')
def postgres_regexp_nocase(element: RegexpWord, compiler: 'sa.Compiled', **kw: Any) -> str:
arg1, arg2 = list(element.clauses)
- return "%s ~* ('\\m(' || %s || ')\\M')::text" % (compiler.process(arg2, **kw), compiler.process(arg1, **kw))
+ return "%s ~* ('\\m(' || %s || ')\\M')::text" \
+ % (compiler.process(arg2, **kw), compiler.process(arg1, **kw))
@compiles(RegexpWord, 'sqlite')
def sqlite_regexp_nocase(element: RegexpWord, compiler: 'sa.Compiled', **kw: Any) -> str:
arg1, arg2 = list(element.clauses)
- return "regexp('\\b(' || %s || ')\\b', %s)" % (compiler.process(arg1, **kw), compiler.process(arg2, **kw))
+ return "regexp('\\b(' || %s || ')\\b', %s)"\
+ % (compiler.process(arg1, **kw), compiler.process(arg2, **kw))
from .sqlalchemy_types import Geometry, KeyValueStore, IntArray
-#pylint: disable=too-many-instance-attributes
+
class SearchTables:
""" Data class that holds the tables of the Nominatim database.
def __init__(self, meta: sa.MetaData) -> None:
self.meta = meta
- self.import_status = sa.Table('import_status', meta,
+ self.import_status = sa.Table(
+ 'import_status', meta,
sa.Column('lastimportdate', sa.DateTime(True), nullable=False),
sa.Column('sequence_id', sa.Integer),
sa.Column('indexed', sa.Boolean))
- self.properties = sa.Table('nominatim_properties', meta,
+ self.properties = sa.Table(
+ 'nominatim_properties', meta,
sa.Column('property', sa.Text, nullable=False),
sa.Column('value', sa.Text))
- self.placex = sa.Table('placex', meta,
+ self.placex = sa.Table(
+ 'placex', meta,
sa.Column('place_id', sa.BigInteger, nullable=False),
sa.Column('parent_place_id', sa.BigInteger),
sa.Column('linked_place_id', sa.BigInteger),
sa.Column('postcode', sa.Text),
sa.Column('centroid', Geometry))
- self.addressline = sa.Table('place_addressline', meta,
+ self.addressline = sa.Table(
+ 'place_addressline', meta,
sa.Column('place_id', sa.BigInteger),
sa.Column('address_place_id', sa.BigInteger),
sa.Column('distance', sa.Float),
sa.Column('fromarea', sa.Boolean),
sa.Column('isaddress', sa.Boolean))
- self.postcode = sa.Table('location_postcode', meta,
+ self.postcode = sa.Table(
+ 'location_postcode', meta,
sa.Column('place_id', sa.BigInteger),
sa.Column('parent_place_id', sa.BigInteger),
sa.Column('rank_search', sa.SmallInteger),
sa.Column('postcode', sa.Text),
sa.Column('geometry', Geometry))
- self.osmline = sa.Table('location_property_osmline', meta,
+ self.osmline = sa.Table(
+ 'location_property_osmline', meta,
sa.Column('place_id', sa.BigInteger, nullable=False),
sa.Column('osm_id', sa.BigInteger),
sa.Column('parent_place_id', sa.BigInteger),
sa.Column('postcode', sa.Text),
sa.Column('country_code', sa.String(2)))
- self.country_name = sa.Table('country_name', meta,
+ self.country_name = sa.Table(
+ 'country_name', meta,
sa.Column('country_code', sa.String(2)),
sa.Column('name', KeyValueStore),
sa.Column('derived_name', KeyValueStore),
sa.Column('partition', sa.Integer))
- self.country_grid = sa.Table('country_osm_grid', meta,
+ self.country_grid = sa.Table(
+ 'country_osm_grid', meta,
sa.Column('country_code', sa.String(2)),
sa.Column('area', sa.Float),
sa.Column('geometry', Geometry))
# The following tables are not necessarily present.
- self.search_name = sa.Table('search_name', meta,
+ self.search_name = sa.Table(
+ 'search_name', meta,
sa.Column('place_id', sa.BigInteger),
sa.Column('importance', sa.Float),
sa.Column('search_rank', sa.SmallInteger),
sa.Column('country_code', sa.String(2)),
sa.Column('centroid', Geometry))
- self.tiger = sa.Table('location_property_tiger', meta,
+ self.tiger = sa.Table(
+ 'location_property_tiger', meta,
sa.Column('place_id', sa.BigInteger),
sa.Column('parent_place_id', sa.BigInteger),
sa.Column('startnumber', sa.Integer),
"""
from __future__ import annotations
from typing import Callable, Any, cast
-import sys
import sqlalchemy as sa
from sqlalchemy.ext.compiler import compiles
from ...typing import SaColumn, SaBind
-#pylint: disable=all
class Geometry_DistanceSpheroid(sa.sql.expression.FunctionElement[float]):
""" Function to compute the spherical distance in meters.
arg1, arg2 = list(element.clauses)
return "MbrIntersects(%s, %s) = 1 and "\
"%s.ROWID IN (SELECT ROWID FROM SpatialIndex "\
- "WHERE f_table_name = '%s' AND f_geometry_column = '%s' "\
- "AND search_frame = %s)" %(
- compiler.process(arg1, **kw),
- compiler.process(arg2, **kw),
- arg1.table.name, arg1.table.name, arg1.name,
- compiler.process(arg2, **kw))
+ " WHERE f_table_name = '%s' AND f_geometry_column = '%s' "\
+ " AND search_frame = %s)"\
+ % (compiler.process(arg1, **kw),
+ compiler.process(arg2, **kw),
+ arg1.table.name, arg1.table.name, arg1.name,
+ compiler.process(arg2, **kw))
class Geometry_ColumnDWithin(sa.sql.expression.FunctionElement[Any]):
compiler: 'sa.Compiled', **kw: Any) -> str:
return "ST_DWithin(%s)" % compiler.process(element.clauses, **kw)
+
@compiles(Geometry_ColumnDWithin, 'sqlite')
def spatialite_dwithin_column(element: Geometry_ColumnDWithin,
compiler: 'sa.Compiled', **kw: Any) -> str:
geom1, geom2, dist = list(element.clauses)
return "ST_Distance(%s, %s) < %s and "\
"%s.ROWID IN (SELECT ROWID FROM SpatialIndex "\
- "WHERE f_table_name = '%s' AND f_geometry_column = '%s' "\
- "AND search_frame = ST_Expand(%s, %s))" %(
- compiler.process(geom1, **kw),
- compiler.process(geom2, **kw),
- compiler.process(dist, **kw),
- geom1.table.name, geom1.table.name, geom1.name,
- compiler.process(geom2, **kw),
- compiler.process(dist, **kw))
+ " WHERE f_table_name = '%s' AND f_geometry_column = '%s' "\
+ " AND search_frame = ST_Expand(%s, %s))"\
+ % (compiler.process(geom1, **kw),
+ compiler.process(geom2, **kw),
+ compiler.process(dist, **kw),
+ geom1.table.name, geom1.table.name, geom1.name,
+ compiler.process(geom2, **kw),
+ compiler.process(dist, **kw))
-class Geometry(types.UserDefinedType): # type: ignore[type-arg]
+class Geometry(types.UserDefinedType): # type: ignore[type-arg]
""" Simplified type decorator for PostGIS geometry. This type
only supports geometries in 4326 projection.
"""
def __init__(self, subtype: str = 'Geometry'):
self.subtype = subtype
-
def get_col_spec(self) -> str:
return f'GEOMETRY({self.subtype}, 4326)'
-
def bind_processor(self, dialect: 'sa.Dialect') -> Callable[[Any], str]:
def process(value: Any) -> str:
if isinstance(value, str):
return cast(str, value.to_wkt())
return process
-
def result_processor(self, dialect: 'sa.Dialect', coltype: object) -> Callable[[Any], str]:
def process(value: Any) -> str:
assert isinstance(value, str)
return value
return process
-
def column_expression(self, col: SaColumn) -> SaColumn:
return sa.func.ST_AsEWKB(col)
-
def bind_expression(self, bindvalue: SaBind) -> SaColumn:
return sa.func.ST_GeomFromText(bindvalue, sa.text('4326'), type_=self)
-
- class comparator_factory(types.UserDefinedType.Comparator): # type: ignore[type-arg]
+ class comparator_factory(types.UserDefinedType.Comparator): # type: ignore[type-arg]
def intersects(self, other: SaColumn, use_index: bool = True) -> 'sa.Operators':
if not use_index:
return Geometry_IntersectsBbox(self.expr, other)
-
def is_line_like(self) -> SaColumn:
return Geometry_IsLineLike(self)
-
def is_area(self) -> SaColumn:
return Geometry_IsAreaLike(self)
-
def within_distance(self, other: SaColumn, distance: SaColumn) -> SaColumn:
if isinstance(self.expr, sa.Column):
return Geometry_ColumnDWithin(self.expr, other, distance)
return self.ST_Distance(other) < distance
-
def ST_Distance(self, other: SaColumn) -> SaColumn:
return sa.func.ST_Distance(self, other, type_=sa.Float)
-
def ST_Contains(self, other: SaColumn) -> SaColumn:
return sa.func.ST_Contains(self, other, type_=sa.Boolean)
-
def ST_CoveredBy(self, other: SaColumn) -> SaColumn:
return sa.func.ST_CoveredBy(self, other, type_=sa.Boolean)
-
def ST_ClosestPoint(self, other: SaColumn) -> SaColumn:
return sa.func.coalesce(sa.func.ST_ClosestPoint(self, other, type_=Geometry),
other)
-
def ST_Buffer(self, other: SaColumn) -> SaColumn:
return sa.func.ST_Buffer(self, other, type_=Geometry)
-
def ST_Expand(self, other: SaColumn) -> SaColumn:
return sa.func.ST_Expand(self, other, type_=Geometry)
-
def ST_Collect(self) -> SaColumn:
return sa.func.ST_Collect(self, type_=Geometry)
-
def ST_Centroid(self) -> SaColumn:
return sa.func.ST_Centroid(self, type_=Geometry)
-
def ST_LineInterpolatePoint(self, other: SaColumn) -> SaColumn:
return sa.func.ST_LineInterpolatePoint(self, other, type_=Geometry)
-
def ST_LineLocatePoint(self, other: SaColumn) -> SaColumn:
return sa.func.ST_LineLocatePoint(self, other, type_=sa.Float)
-
def distance_spheroid(self, other: SaColumn) -> SaColumn:
return Geometry_DistanceSpheroid(self, other)
@compiles(Geometry, 'sqlite')
-def get_col_spec(self, *args, **kwargs): # type: ignore[no-untyped-def]
+def get_col_spec(self, *args, **kwargs): # type: ignore[no-untyped-def]
return 'GEOMETRY'
('ST_LineInterpolatePoint', sa.Float, 'ST_Line_Interpolate_Point'),
)
+
def _add_function_alias(func: str, ftype: type, alias: str) -> None:
_FuncDef = type(func, (sa.sql.functions.GenericFunction, ), {
"type": ftype(),
compiles(_FuncDef, 'sqlite')(_sqlite_impl)
+
for alias in SQLITE_FUNCTION_ALIAS:
_add_function_alias(*alias)
"""
Custom type for an array of integers.
"""
-from typing import Any, List, cast, Optional
+from typing import Any, List, Optional
import sqlalchemy as sa
from sqlalchemy.ext.compiler import compiles
from ...typing import SaDialect, SaColumn
-# pylint: disable=all
class IntList(sa.types.TypeDecorator[Any]):
""" A list of integers saved as a text of comma-separated numbers.
def load_dialect_impl(self, dialect: SaDialect) -> sa.types.TypeEngine[Any]:
if dialect.name == 'postgresql':
- return ARRAY(sa.Integer()) #pylint: disable=invalid-name
+ return ARRAY(sa.Integer())
return IntList()
-
- class comparator_factory(sa.types.UserDefinedType.Comparator): # type: ignore[type-arg]
+ class comparator_factory(sa.types.UserDefinedType.Comparator): # type: ignore[type-arg]
def __add__(self, other: SaColumn) -> 'sa.ColumnOperators':
""" Concate the array with the given array. If one of the
"""
return ArrayCat(self.expr, other)
-
def contains(self, other: SaColumn, **kwargs: Any) -> 'sa.ColumnOperators':
""" Return true if the array contains all the value of the argument
array.
return ArrayContains(self.expr, other)
-
class ArrayAgg(sa.sql.functions.GenericFunction[Any]):
""" Aggregate function to collect elements in an array.
"""
return "group_concat(%s, ',')" % compiler.process(element.clauses, **kw)
-
class ArrayContains(sa.sql.expression.FunctionElement[Any]):
""" Function to check if an array is fully contained in another.
"""
return "array_contains(%s)" % compiler.process(element.clauses, **kw)
-
class ArrayCat(sa.sql.expression.FunctionElement[Any]):
""" Function to check if an array is fully contained in another.
"""
def sqlite_array_cat(element: ArrayCat, compiler: 'sa.Compiled', **kw: Any) -> str:
arg1, arg2 = list(element.clauses)
return "(%s || ',' || %s)" % (compiler.process(arg1, **kw), compiler.process(arg2, **kw))
-
from ...typing import SaDialect
-# pylint: disable=all
class Json(sa.types.TypeDecorator[Any]):
""" Dialect-independent type for JSON.
def load_dialect_impl(self, dialect: SaDialect) -> sa.types.TypeEngine[Any]:
if dialect.name == 'postgresql':
- return JSONB(none_as_null=True) # type: ignore[no-untyped-call]
+ return JSONB(none_as_null=True) # type: ignore[no-untyped-call]
return sqlite_json(none_as_null=True)
from ...typing import SaDialect, SaColumn
-# pylint: disable=all
class KeyValueStore(sa.types.TypeDecorator[Any]):
""" Dialect-independent type of a simple key-value store of strings.
def load_dialect_impl(self, dialect: SaDialect) -> sa.types.TypeEngine[Any]:
if dialect.name == 'postgresql':
- return HSTORE() # type: ignore[no-untyped-call]
+ return HSTORE() # type: ignore[no-untyped-call]
return sqlite_json(none_as_null=True)
-
- class comparator_factory(sa.types.UserDefinedType.Comparator): # type: ignore[type-arg]
+ class comparator_factory(sa.types.UserDefinedType.Comparator): # type: ignore[type-arg]
def merge(self, other: SaColumn) -> 'sa.Operators':
""" Merge the values from the given KeyValueStore into this
name = 'JsonConcat'
inherit_cache = True
+
@compiles(KeyValueConcat)
def default_json_concat(element: KeyValueConcat, compiler: 'sa.Compiled', **kw: Any) -> str:
arg1, arg2 = list(element.clauses)
- return "(%s || coalesce(%s, ''::hstore))" % (compiler.process(arg1, **kw), compiler.process(arg2, **kw))
+ return "(%s || coalesce(%s, ''::hstore))"\
+ % (compiler.process(arg1, **kw), compiler.process(arg2, **kw))
+
@compiles(KeyValueConcat, 'sqlite')
def sqlite_json_concat(element: KeyValueConcat, compiler: 'sa.Compiled', **kw: Any) -> str:
arg1, arg2 = list(element.clauses)
- return "json_patch(%s, coalesce(%s, '{}'))" % (compiler.process(arg1, **kw), compiler.process(arg2, **kw))
-
-
-
+ return "json_patch(%s, coalesce(%s, '{}'))"\
+ % (compiler.process(arg1, **kw), compiler.process(arg2, **kw))
from typing import cast, Optional, Set, Any
import json
-# pylint: disable=protected-access
def weigh_search(search_vector: Optional[str], rankings: str, default: float) -> float:
""" Custom weight function for search results.
def _create_aggregate(conn: Any, name: str, nargs: int, aggregate: Any) -> None:
try:
conn.await_(_make_aggregate(conn._connection, name, nargs, aggregate))
- except Exception as error: # pylint: disable=broad-exception-caught
+ except Exception as error:
conn._handle_exception(error)
from .connection import SearchConnection
from .version import NOMINATIM_API_VERSION
+
@dataclasses.dataclass
class StatusResult:
""" Result of a call to the status API.
from .errors import UsageError
from .localization import Locales
-# pylint: disable=no-member,too-many-boolean-expressions,too-many-instance-attributes
@dataclasses.dataclass
class PlaceID:
x: float
y: float
-
@property
def lat(self) -> float:
""" Return the latitude of the point.
"""
return self.y
-
@property
def lon(self) -> float:
""" Return the longitude of the point.
"""
return self.x
-
def to_geojson(self) -> str:
""" Return the point in GeoJSON format.
"""
return f'{{"type": "Point","coordinates": [{self.x}, {self.y}]}}'
-
@staticmethod
def from_wkb(wkb: Union[str, bytes]) -> 'Point':
""" Create a point from EWKB as returned from the database.
return Point(x, y)
-
@staticmethod
def from_param(inp: Any) -> 'Point':
""" Create a point from an input parameter. The parameter
return Point(x, y)
-
def to_wkt(self) -> str:
""" Return the WKT representation of the point.
"""
return f'POINT({self.x} {self.y})'
-
AnyPoint = Union[Point, Tuple[float, float]]
WKB_BBOX_HEADER_LE = b'\x01\x03\x00\x00\x20\xE6\x10\x00\x00\x01\x00\x00\x00\x05\x00\x00\x00'
WKB_BBOX_HEADER_BE = b'\x00\x20\x00\x00\x03\x00\x00\x10\xe6\x00\x00\x00\x01\x00\x00\x00\x05'
+
class Bbox:
""" A bounding box in WGS84 projection.
"""
self.coords = (minx, miny, maxx, maxy)
-
@property
def minlat(self) -> float:
""" Southern-most latitude, corresponding to the minimum y coordinate.
"""
return self.coords[1]
-
@property
def maxlat(self) -> float:
""" Northern-most latitude, corresponding to the maximum y coordinate.
"""
return self.coords[3]
-
@property
def minlon(self) -> float:
""" Western-most longitude, corresponding to the minimum x coordinate.
"""
return self.coords[0]
-
@property
def maxlon(self) -> float:
""" Eastern-most longitude, corresponding to the maximum x coordinate.
"""
return self.coords[2]
-
@property
def area(self) -> float:
""" Return the area of the box in WGS84.
"""
return (self.coords[2] - self.coords[0]) * (self.coords[3] - self.coords[1])
-
def contains(self, pt: Point) -> bool:
""" Check if the point is inside or on the boundary of the box.
"""
return self.coords[0] <= pt[0] and self.coords[1] <= pt[1]\
- and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
-
+ and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
def to_wkt(self) -> str:
""" Return the WKT representation of the Bbox. This
is a simple polygon with four points.
"""
return 'POLYGON(({0} {1},{0} {3},{2} {3},{2} {1},{0} {1}))'\
- .format(*self.coords) # pylint: disable=consider-using-f-string
-
+ .format(*self.coords)
@staticmethod
def from_wkb(wkb: Union[None, str, bytes]) -> 'Optional[Bbox]':
return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
-
@staticmethod
def from_point(pt: Point, buffer: float) -> 'Bbox':
""" Return a Bbox around the point with the buffer added to all sides.
return Bbox(pt[0] - buffer, pt[1] - buffer,
pt[0] + buffer, pt[1] + buffer)
-
@staticmethod
def from_param(inp: Any) -> 'Bbox':
""" Return a Bbox from an input parameter. The box may be
"""
return categories
-TParam = TypeVar('TParam', bound='LookupDetails') # pylint: disable=invalid-name
+
+TParam = TypeVar('TParam', bound='LookupDetails')
+
@dataclasses.dataclass
class LookupDetails:
else field.default
if field.metadata and 'transform' in field.metadata:
return field.metadata['transform'](v)
- if not isinstance(v, field.type): # type: ignore[arg-type]
+ if not isinstance(v, field.type): # type: ignore[arg-type]
raise UsageError(f"Parameter '{field.name}' needs to be of {field.type!s}.")
return v
class ReverseDetails(LookupDetails):
""" Collection of parameters for the reverse call.
"""
+
max_rank: int = dataclasses.field(default=30,
- metadata={'transform': lambda v: max(0, min(v, 30))}
- )
+ metadata={'transform': lambda v: max(0, min(v, 30))})
""" Highest address rank to return.
"""
+
layers: DataLayer = DataLayer.ADDRESS | DataLayer.POI
""" Filter which kind of data to include.
"""
+
@dataclasses.dataclass
class SearchDetails(LookupDetails):
""" Collection of parameters for the search call.
""" Maximum number of results to be returned. The actual number of results
may be less.
"""
+
min_rank: int = dataclasses.field(default=0,
- metadata={'transform': lambda v: max(0, min(v, 30))}
- )
+ metadata={'transform': lambda v: max(0, min(v, 30))})
""" Lowest address rank to return.
"""
+
max_rank: int = dataclasses.field(default=30,
- metadata={'transform': lambda v: max(0, min(v, 30))}
- )
+ metadata={'transform': lambda v: max(0, min(v, 30))})
""" Highest address rank to return.
"""
+
layers: Optional[DataLayer] = dataclasses.field(default=None,
- metadata={'transform': lambda r : r})
+ metadata={'transform': lambda r: r})
""" Filter which kind of data to include. When 'None' (the default) then
filtering by layers is disabled.
"""
+
countries: List[str] = dataclasses.field(default_factory=list,
metadata={'transform': format_country})
""" Restrict search results to the given countries. An empty list (the
default) will disable this filter.
"""
+
excluded: List[int] = dataclasses.field(default_factory=list,
metadata={'transform': format_excluded})
""" List of OSM objects to exclude from the results. Currently only
works when the internal place ID is given.
An empty list (the default) will disable this filter.
"""
+
viewbox: Optional[Bbox] = dataclasses.field(default=None,
metadata={'transform': Bbox.from_param})
""" Focus the search on a given map area.
"""
+
bounded_viewbox: bool = False
""" Use 'viewbox' as a filter and restrict results to places within the
given area.
"""
+
near: Optional[Point] = dataclasses.field(default=None,
metadata={'transform': Point.from_param})
""" Order results by distance to the given point.
"""
+
near_radius: Optional[float] = dataclasses.field(default=None,
- metadata={'transform': lambda r : r})
+ metadata={'transform': lambda r: r})
""" Use near point as a filter and drop results outside the given
radius. Radius is given in degrees WSG84.
"""
+
categories: List[Tuple[str, str]] = dataclasses.field(default_factory=list,
metadata={'transform': format_categories})
""" Restrict search to places with one of the given class/type categories.
An empty list (the default) will disable this filter.
"""
+
viewbox_x2: Optional[Bbox] = None
def __post_init__(self) -> None:
self.viewbox_x2 = Bbox(self.viewbox.minlon - xext, self.viewbox.minlat - yext,
self.viewbox.maxlon + xext, self.viewbox.maxlat + yext)
-
def restrict_min_max_rank(self, new_min: int, new_max: int) -> None:
""" Change the min_rank and max_rank fields to respect the
given boundaries.
self.min_rank = max(self.min_rank, new_min)
self.max_rank = min(self.max_rank, new_max)
-
def is_impossible(self) -> bool:
""" Check if the parameter configuration is contradictionary and
cannot yield any results.
or (self.max_rank <= 4 and
self.layers is not None and not self.layers & DataLayer.ADDRESS))
-
def layer_enabled(self, layer: DataLayer) -> bool:
""" Check if the given layer has been chosen. Also returns
true when layer restriction has been disabled completely.
"""
from typing import Union, TYPE_CHECKING
-# pylint: disable=missing-class-docstring,useless-import-alias
+# flake8: noqa
# SQLAlchemy introduced generic types in version 2.0 making typing
# incompatible with older versions. Add wrappers here so we don't have
try:
import ujson as json
except ModuleNotFoundError:
- import json # type: ignore[no-redef]
+ import json # type: ignore[no-redef]
+
+T = TypeVar('T')
-T = TypeVar('T') # pylint: disable=invalid-name
class JsonWriter:
""" JSON encoder that renders the output directly into an output
self.data = io.StringIO()
self.pending = ''
-
def __call__(self) -> str:
""" Return the rendered JSON content as a string.
The writer remains usable after calling this function.
self.pending = ''
return self.data.getvalue()
-
def start_object(self) -> 'JsonWriter':
""" Write the open bracket of a JSON object.
"""
self.pending = '{'
return self
-
def end_object(self) -> 'JsonWriter':
""" Write the closing bracket of a JSON object.
"""
self.pending = '}'
return self
-
def start_array(self) -> 'JsonWriter':
""" Write the opening bracket of a JSON array.
"""
self.pending = '['
return self
-
def end_array(self) -> 'JsonWriter':
""" Write the closing bracket of a JSON array.
"""
self.pending = ']'
return self
-
def key(self, name: str) -> 'JsonWriter':
""" Write the key string of a JSON object.
"""
self.pending = ':'
return self
-
def value(self, value: Any) -> 'JsonWriter':
""" Write out a value as JSON. The function uses the json.dumps()
function for encoding the JSON. Thus any value that can be
"""
return self.raw(json.dumps(value, ensure_ascii=False))
-
def float(self, value: float, precision: int) -> 'JsonWriter':
""" Write out a float value with the given precision.
"""
self.pending = ','
return self
-
def raw(self, raw_json: str) -> 'JsonWriter':
""" Write out the given value as is. This function is useful if
a value is already available in JSON format.
self.data.write(raw_json)
return self
-
def keyval(self, key: str, value: Any) -> 'JsonWriter':
""" Write out an object element with the given key and value.
This is a shortcut for calling 'key()', 'value()' and 'next()'.
self.value(value)
return self.next()
-
def keyval_not_none(self, key: str, value: Optional[T],
transform: Optional[Callable[[T], Any]] = None) -> 'JsonWriter':
""" Write out an object element only if the value is not None.
Implementation of API version v1 (aka the legacy version).
"""
-#pylint: disable=useless-import-alias
-
from .server_glue import ROUTES as ROUTES
from ..results import ReverseResult, SearchResult
from ..types import Bbox
+
def get_label_tag(category: Tuple[str, str], extratags: Optional[Mapping[str, str]],
rank: int, country: Optional[str]) -> str:
""" Create a label tag for the given place that can be used as an XML name.
label = category[1] if category[1] != 'yes' else category[0]
elif rank < 28:
label = 'road'
- elif category[0] == 'place'\
- and category[1] in ('house_number', 'house_name', 'country_code'):
+ elif (category[0] == 'place'
+ and category[1] in ('house_number', 'house_name', 'country_code')):
label = category[1]
else:
label = category[0]
from .. import logging as loglib
from ..server import content_types as ct
+
class RawDataList(List[Dict[str, Any]]):
""" Data type for formatting raw data lists 'as is' in json.
"""
+
dispatch = FormatDispatcher({'text': ct.CONTENT_TEXT,
'xml': ct.CONTENT_XML,
'debug': ct.CONTENT_HTML})
+
@dispatch.error_format_func
def _format_error(content_type: str, msg: str, status: int) -> str:
if content_type == ct.CONTENT_XML:
out = JsonWriter()
out.start_object()\
- .keyval('status', result.status)\
- .keyval('message', result.message)\
- .keyval_not_none('data_updated', result.data_updated,
- lambda v: v.isoformat())\
- .keyval('software_version', str(result.software_version))\
- .keyval_not_none('database_version', result.database_version, str)\
- .end_object()
+ .keyval('status', result.status)\
+ .keyval('message', result.message)\
+ .keyval_not_none('data_updated', result.data_updated,
+ lambda v: v.isoformat())\
+ .keyval('software_version', str(result.software_version))\
+ .keyval_not_none('database_version', result.database_version, str)\
+ .end_object()
return out()
writer.key('hierarchy').start_object()
for group, grouped in data.items():
writer.key(group).start_array()
- grouped.sort() # sorts alphabetically by local name
+ grouped.sort() # sorts alphabetically by local name
for line in grouped:
writer.raw(line).next()
writer.end_array().next()
out = JsonWriter()
out.start_object()\
- .keyval_not_none('place_id', result.place_id)\
- .keyval_not_none('parent_place_id', result.parent_place_id)
+ .keyval_not_none('place_id', result.place_id)\
+ .keyval_not_none('parent_place_id', result.parent_place_id)
if result.osm_object is not None:
out.keyval('osm_type', result.osm_object[0])\
.keyval('osm_id', result.osm_object[1])
out.keyval('category', result.category[0])\
- .keyval('type', result.category[1])\
- .keyval('admin_level', result.admin_level)\
- .keyval('localname', result.locale_name or '')\
- .keyval('names', result.names or {})\
- .keyval('addresstags', result.address or {})\
- .keyval_not_none('housenumber', result.housenumber)\
- .keyval_not_none('calculated_postcode', result.postcode)\
- .keyval_not_none('country_code', result.country_code)\
- .keyval_not_none('indexed_date', result.indexed_date, lambda v: v.isoformat())\
- .keyval_not_none('importance', result.importance)\
- .keyval('calculated_importance', result.calculated_importance())\
- .keyval('extratags', result.extratags or {})\
- .keyval_not_none('calculated_wikipedia', result.wikipedia)\
- .keyval('rank_address', result.rank_address)\
- .keyval('rank_search', result.rank_search)\
- .keyval('isarea', 'Polygon' in (geom or result.geometry.get('type') or ''))\
- .key('centroid').raw(centroid).next()\
- .key('geometry').raw(geom or centroid).next()
+ .keyval('type', result.category[1])\
+ .keyval('admin_level', result.admin_level)\
+ .keyval('localname', result.locale_name or '')\
+ .keyval('names', result.names or {})\
+ .keyval('addresstags', result.address or {})\
+ .keyval_not_none('housenumber', result.housenumber)\
+ .keyval_not_none('calculated_postcode', result.postcode)\
+ .keyval_not_none('country_code', result.country_code)\
+ .keyval_not_none('indexed_date', result.indexed_date, lambda v: v.isoformat())\
+ .keyval_not_none('importance', result.importance)\
+ .keyval('calculated_importance', result.calculated_importance())\
+ .keyval('extratags', result.extratags or {})\
+ .keyval_not_none('calculated_wikipedia', result.wikipedia)\
+ .keyval('rank_address', result.rank_address)\
+ .keyval('rank_search', result.rank_search)\
+ .keyval('isarea', 'Polygon' in (geom or result.geometry.get('type') or ''))\
+ .key('centroid').raw(centroid).next()\
+ .key('geometry').raw(geom or centroid).next()
if options.get('icon_base_url', None):
icon = ICONS.get(result.category)
extra)
-
@dispatch.format_func(SearchResults, 'geojson')
def _format_search_geojson(results: SearchResults,
- options: Mapping[str, Any]) -> str:
+ options: Mapping[str, Any]) -> str:
return format_json.format_base_geojson(results, options, False)
@dispatch.format_func(SearchResults, 'geocodejson')
def _format_search_geocodejson(results: SearchResults,
- options: Mapping[str, Any]) -> str:
+ options: Mapping[str, Any]) -> str:
return format_json.format_base_geocodejson(results, options, False)
@dispatch.format_func(SearchResults, 'json')
def _format_search_json(results: SearchResults,
- options: Mapping[str, Any]) -> str:
+ options: Mapping[str, Any]) -> str:
return format_json.format_base_json(results, options, False,
class_label='class')
@dispatch.format_func(SearchResults, 'jsonv2')
def _format_search_jsonv2(results: SearchResults,
- options: Mapping[str, Any]) -> str:
+ options: Mapping[str, Any]) -> str:
return format_json.format_base_json(results, options, False,
class_label='category')
+
@dispatch.format_func(RawDataList, 'json')
def _format_raw_data_json(results: RawDataList, _: Mapping[str, Any]) -> str:
out = JsonWriter()
out.start_object()
for k, v in res.items():
if isinstance(v, dt.datetime):
- out.keyval(k, v.isoformat(sep= ' ', timespec='seconds'))
+ out.keyval(k, v.isoformat(sep=' ', timespec='seconds'))
else:
out.keyval(k, v)
out.end_object().next()
from ..results import AddressLines, ReverseResults, SearchResults
from . import classtypes as cl
-#pylint: disable=too-many-branches
def _write_osm_id(out: JsonWriter, osm_object: Optional[Tuple[str, int]]) -> None:
if osm_object is not None:
def _write_typed_address(out: JsonWriter, address: Optional[AddressLines],
- country_code: Optional[str]) -> None:
+ country_code: Optional[str]) -> None:
parts = {}
for line in (address or []):
if line.isaddress:
out.keyval('postcode', line.local_name)
elif line.category[1] == 'house_number':
out.keyval('housenumber', line.local_name)
- elif (obj_place_id is None or obj_place_id != line.place_id) \
- and line.rank_address >= 4 and line.rank_address < 28:
+ elif ((obj_place_id is None or obj_place_id != line.place_id)
+ and line.rank_address >= 4 and line.rank_address < 28):
rank_name = GEOCODEJSON_RANKS[line.rank_address]
if rank_name not in extra:
extra[rank_name] = line.local_name
-
for k, v in extra.items():
out.keyval(k, v)
_write_osm_id(out, result.osm_object)
out.keyval('lat', f"{result.centroid.lat}")\
- .keyval('lon', f"{result.centroid.lon}")\
- .keyval(class_label, result.category[0])\
- .keyval('type', result.category[1])\
- .keyval('place_rank', result.rank_search)\
- .keyval('importance', result.calculated_importance())\
- .keyval('addresstype', cl.get_label_tag(result.category, result.extratags,
- result.rank_address,
- result.country_code))\
- .keyval('name', result.locale_name or '')\
- .keyval('display_name', result.display_name or '')
-
+ .keyval('lon', f"{result.centroid.lon}")\
+ .keyval(class_label, result.category[0])\
+ .keyval('type', result.category[1])\
+ .keyval('place_rank', result.rank_search)\
+ .keyval('importance', result.calculated_importance())\
+ .keyval('addresstype', cl.get_label_tag(result.category, result.extratags,
+ result.rank_address,
+ result.country_code))\
+ .keyval('name', result.locale_name or '')\
+ .keyval('display_name', result.display_name or '')
if options.get('icon_base_url', None):
icon = cl.ICONS.get(result.category)
bbox = cl.bbox_from_result(result)
out.key('boundingbox').start_array()\
- .value(f"{bbox.minlat:0.7f}").next()\
- .value(f"{bbox.maxlat:0.7f}").next()\
- .value(f"{bbox.minlon:0.7f}").next()\
- .value(f"{bbox.maxlon:0.7f}").next()\
+ .value(f"{bbox.minlat:0.7f}").next()\
+ .value(f"{bbox.maxlat:0.7f}").next()\
+ .value(f"{bbox.minlon:0.7f}").next()\
+ .value(f"{bbox.maxlon:0.7f}").next()\
.end_array().next()
if result.geometry:
out = JsonWriter()
out.start_object()\
- .keyval('type', 'FeatureCollection')\
- .keyval('licence', cl.OSM_ATTRIBUTION)\
- .key('features').start_array()
+ .keyval('type', 'FeatureCollection')\
+ .keyval('licence', cl.OSM_ATTRIBUTION)\
+ .key('features').start_array()
for result in results:
out.start_object()\
if options.get('namedetails', False):
out.keyval('namedetails', result.names)
- out.end_object().next() # properties
+ out.end_object().next() # properties
out.key('bbox').start_array()
for coord in cl.bbox_from_result(result).coords:
out = JsonWriter()
out.start_object()\
- .keyval('type', 'FeatureCollection')\
- .key('geocoding').start_object()\
- .keyval('version', '0.1.0')\
- .keyval('attribution', cl.OSM_ATTRIBUTION)\
- .keyval('licence', 'ODbL')\
- .keyval_not_none('query', options.get('query'))\
- .end_object().next()\
- .key('features').start_array()
+ .keyval('type', 'FeatureCollection')\
+ .key('geocoding').start_object()\
+ .keyval('version', '0.1.0')\
+ .keyval('attribution', cl.OSM_ATTRIBUTION)\
+ .keyval('licence', 'ODbL')\
+ .keyval_not_none('query', options.get('query'))\
+ .end_object().next()\
+ .key('features').start_array()
for result in results:
out.start_object()\
.keyval('type', 'Feature')\
.key('properties').start_object()\
- .key('geocoding').start_object()
+ .key('geocoding').start_object()
out.keyval_not_none('place_id', result.place_id)
SearchResult, SearchResults
from . import classtypes as cl
-#pylint: disable=too-many-branches
def _write_xml_address(root: ET.Element, address: AddressLines,
country_code: Optional[str]) -> None:
if line.names and 'ISO3166-2' in line.names and line.admin_level:
parts[f"ISO3166-2-lvl{line.admin_level}"] = line.names['ISO3166-2']
- for k,v in parts.items():
+ for k, v in parts.items():
ET.SubElement(root, k).text = v
if country_code:
if options.get('namedetails', False):
eroot = ET.SubElement(root if simple else place, 'namedetails')
if result.names:
- for k,v in result.names.items():
+ for k, v in result.names.items():
ET.SubElement(eroot, 'name', attrib={'desc': k}).text = v
return '<?xml version="1.0" encoding="UTF-8" ?>\n' + ET.tostring(root, encoding='unicode')
from ..results import SearchResult, SearchResults, SourceTable
from ..types import SearchDetails, GeometryFormat
+
REVERSE_MAX_RANKS = [2, 2, 2, # 0-2 Continent/Sea
4, 4, # 3-4 Country
8, # 5 State
26, # 16 Major Streets
27, # 17 Minor Streets
30 # 18 Building
- ]
+ ]
def zoom_to_rank(zoom: int) -> int:
return FEATURE_TYPE_TO_RANK.get(feature_type, (0, 30))
-#pylint: disable=too-many-arguments,too-many-branches
def extend_query_parts(queryparts: Dict[str, Any], details: Dict[str, Any],
feature_type: Optional[str],
namedetails: bool, extratags: bool,
and result.names.get('ref') == postcode
-def _deg(axis:str) -> str:
+def _deg(axis: str) -> str:
return f"(?P<{axis}_deg>\\d+\\.\\d+)°?"
+
def _deg_min(axis: str) -> str:
return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>[\\d.]+)[′']*"
+
def _deg_min_sec(axis: str) -> str:
return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>\\d+)[′'\\s]+(?P<{axis}_sec>[\\d.]+)[\"″]*"
+
COORD_REGEX = [re.compile(r'(?:(?P<pre>.*?)\s+)??' + r + r'(?:\s+(?P<post>.*))?') for r in (
r"(?P<ns>[NS])\s*" + _deg('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg('lon'),
_deg('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg('lon') + r"\s*(?P<ew>[EW])",
r"\[?(?P<lat_deg>[+-]?\d+\.\d+)[\s,]+(?P<lon_deg>[+-]?\d+\.\d+)\]?"
)]
+
def extract_coords_from_query(query: str) -> Tuple[str, Optional[float], Optional[float]]:
""" Look for something that is formatted like a coordinate at the
beginning or end of the query. If found, extract the coordinate and
CATEGORY_REGEX = re.compile(r'(?P<pre>.*?)\[(?P<cls>[a-zA-Z_]+)=(?P<typ>[a-zA-Z_]+)\](?P<post>.*)')
+
def extract_category_from_query(query: str) -> Tuple[str, Optional[str], Optional[str]]:
""" Extract a hidden category specification of the form '[key=value]' from
the query. If found, extract key and value and
from ..server import content_types as ct
from ..server.asgi_adaptor import ASGIAdaptor
+
def build_response(adaptor: ASGIAdaptor, output: str, status: int = 200,
num_results: int = 0) -> Any:
""" Create a response from the given output. Wraps a JSONP function
""" Return the accepted languages.
"""
return adaptor.get('accept-language')\
- or adaptor.get_header('accept-language')\
- or adaptor.config().DEFAULT_LANGUAGE
+ or adaptor.get_header('accept-language')\
+ or adaptor.config().DEFAULT_LANGUAGE
def setup_debugging(adaptor: ASGIAdaptor) -> bool:
if not formatting.supports_format(result_type, fmt):
adaptor.raise_error("Parameter 'format' must be one of: " +
- ', '.join(formatting.list_formats(result_type)))
+ ', '.join(formatting.list_formats(result_type)))
adaptor.content_type = formatting.get_content_type(fmt)
return fmt
return {'address_details': True,
'geometry_simplification': adaptor.get_float('polygon_threshold', 0.0),
'geometry_output': output
- }
+ }
async def status_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
status_code = 200
return build_response(params, params.formatting().format_result(result, fmt, {}),
- status=status_code)
+ status=status_code)
async def details_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
linked_places=params.get_bool('linkedplaces', True),
parented_places=params.get_bool('hierarchy', False),
keywords=params.get_bool('keywords', False),
- geometry_output = GeometryFormat.GEOJSON
- if params.get_bool('polygon_geojson', False)
- else GeometryFormat.NONE,
+ geometry_output=(GeometryFormat.GEOJSON
+ if params.get_bool('polygon_geojson', False)
+ else GeometryFormat.NONE),
locales=locales
- )
+ )
if debug:
return build_response(params, loglib.get_and_disable())
if result is None:
params.raise_error('No place with that OSM ID found.', status=404)
- output = params.formatting().format_result(result, fmt,
- {'locales': locales,
- 'group_hierarchy': params.get_bool('group_hierarchy', False),
- 'icon_base_url': params.config().MAPICON_URL})
+ output = params.formatting().format_result(
+ result, fmt,
+ {'locales': locales,
+ 'group_hierarchy': params.get_bool('group_hierarchy', False),
+ 'icon_base_url': params.config().MAPICON_URL})
return build_response(params, output, num_results=1)
async def _unstructured_search(query: str, api: NominatimAPIAsync,
- details: Dict[str, Any]) -> SearchResults:
+ details: Dict[str, Any]) -> SearchResults:
if not query:
return SearchResults()
debug = setup_debugging(params)
details = parse_geometry_details(params, fmt)
- details['countries'] = params.get('countrycodes', None)
+ details['countries'] = params.get('countrycodes', None)
details['excluded'] = params.get('exclude_place_ids', None)
details['viewbox'] = params.get('viewbox', None) or params.get('viewboxlbrt', None)
details['bounded_viewbox'] = params.get_bool('bounded', False)
details['dedupe'] = params.get_bool('dedupe', True)
max_results = max(1, min(50, params.get_int('limit', 10)))
- details['max_results'] = max_results + min(10, max_results) \
- if details['dedupe'] else max_results
+ details['max_results'] = (max_results + min(10, max_results)
+ if details['dedupe'] else max_results)
details['min_rank'], details['max_rank'] = \
helpers.feature_type_to_rank(params.get('featureType', ''))
LOG = logging.getLogger()
+
class CommandlineParser:
""" Wraps some of the common functions for parsing the command line
and setting up subcommands.
group.add_argument('-j', '--threads', metavar='NUM', type=int,
help='Number of parallel threads to use')
-
def nominatim_version_text(self) -> str:
""" Program name and version number as string
"""
text += f' ({version.GIT_COMMIT_HASH})'
return text
-
def add_subcommand(self, name: str, cmd: Subcommand) -> None:
""" Add a subcommand to the parser. The subcommand must be a class
with a function add_args() that adds the parameters for the
parser.set_defaults(command=cmd)
cmd.add_args(parser)
-
def run(self, **kwargs: Any) -> int:
""" Parse the command line arguments of the program and execute the
appropriate subcommand.
return ret
except UsageError as exception:
if log.isEnabledFor(logging.DEBUG):
- raise # use Python's exception printing
+ raise # use Python's exception printing
log.fatal('FATAL: %s', exception)
# If we get here, then execution has failed in some way.
# a subcommand.
#
# No need to document the functions each time.
-# pylint: disable=C0111
class AdminServe:
"""\
Start a simple web server for serving the API.
choices=('falcon', 'starlette'),
help='Webserver framework to run. (default: falcon)')
-
def run(self, args: NominatimArgs) -> int:
asyncio.run(self.run_uvicorn(args))
return 0
-
async def run_uvicorn(self, args: NominatimArgs) -> None:
- import uvicorn # pylint: disable=import-outside-toplevel
+ import uvicorn
server_info = args.server.split(':', 1)
host = server_info[0]
parser.add_subcommand('details', apicmd.APIDetails())
parser.add_subcommand('status', apicmd.APIStatus())
except ModuleNotFoundError as ex:
- if not ex.name or 'nominatim_api' not in ex.name: # pylint: disable=E1135
+ if not ex.name or 'nominatim_api' not in ex.name:
raise ex
parser.parser.epilog = \
'\n export, convert, serve, search, reverse, lookup, details, status'\
"\n\nRun 'pip install nominatim-api' to install the package."
-
return parser
from ..db.connection import connect
from ..tools.freeze import is_frozen
-# Do not repeat documentation of subcommand classes.
-# pylint: disable=C0111
-# Using non-top-level imports to avoid eventually unused imports.
-# pylint: disable=E0012,C0415
LOG = logging.getLogger()
+
class UpdateAddData:
"""\
Add additional data from a file or an online source.
group2.add_argument('--socket-timeout', dest='socket_timeout', type=int, default=60,
help='Set timeout for file downloads')
-
def run(self, args: NominatimArgs) -> int:
from ..tools import add_osm_data
return 0
-
async def _add_tiger_data(self, args: NominatimArgs) -> int:
from ..tokenizer import factory as tokenizer_factory
from ..tools import tiger_data
tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
return await tiger_data.add_tiger_data(args.tiger_data,
args.config,
- args.threads or psutil.cpu_count() or 1,
+ args.threads or psutil.cpu_count() or 1,
tokenizer)
mgroup.add_argument('--place-id', type=int,
help='Analyse indexing of the given Nominatim object')
-
def run(self, args: NominatimArgs) -> int:
# pylint: disable=too-many-return-statements
if args.warm:
return 1
-
def _warm(self, args: NominatimArgs) -> int:
try:
import nominatim_api as napi
from ..errors import UsageError
from .args import NominatimArgs
-# Do not repeat documentation of subcommand classes.
-# pylint: disable=C0111
LOG = logging.getLogger()
+
STRUCTURED_QUERY = (
('amenity', 'name and/or type of POI'),
('street', 'housenumber and street'),
('postalcode', 'postcode')
)
+
EXTRADATA_PARAMS = (
('addressdetails', 'Include a breakdown of the address into elements'),
('extratags', ("Include additional information if available "
('namedetails', 'Include a list of alternative names')
)
+
def _add_list_format(parser: argparse.ArgumentParser) -> None:
group = parser.add_argument_group('Other options')
group.add_argument('--list-formats', action='store_true',
group.add_argument('--polygon-output',
choices=['geojson', 'kml', 'svg', 'text'],
help='Output geometry of results as a GeoJSON, KML, SVG or WKT')
- group.add_argument('--polygon-threshold', type=float, default = 0.0,
+ group.add_argument('--polygon-threshold', type=float, default=0.0,
metavar='TOLERANCE',
help=("Simplify output geometry."
"Parameter is difference tolerance in degrees."))
help='Do not remove duplicates from the result list')
_add_list_format(parser)
-
def run(self, args: NominatimArgs) -> int:
formatter = napi.load_format_dispatcher('v1', args.project_dir)
try:
with napi.NominatimAPI(args.project_dir) as api:
params: Dict[str, Any] = {'max_results': args.limit + min(args.limit, 10),
- 'address_details': True, # needed for display name
+ 'address_details': True, # needed for display name
'geometry_output': _get_geometry_output(args),
'geometry_simplification': args.polygon_threshold,
'countries': args.countrycodes,
'viewbox': args.viewbox,
'bounded_viewbox': args.bounded,
'locales': _get_locales(args, api.config.DEFAULT_LANGUAGE)
- }
+ }
if args.query:
results = api.search(args.query, **params)
_add_api_output_arguments(parser)
_add_list_format(parser)
-
def run(self, args: NominatimArgs) -> int:
formatter = napi.load_format_dispatcher('v1', args.project_dir)
result = api.reverse(napi.Point(args.lon, args.lat),
max_rank=zoom_to_rank(args.zoom or 18),
layers=layers,
- address_details=True, # needed for display name
+ address_details=True, # needed for display name
geometry_output=_get_geometry_output(args),
geometry_simplification=args.polygon_threshold,
locales=_get_locales(args, api.config.DEFAULT_LANGUAGE))
return 42
-
class APILookup:
"""\
Execute API lookup query.
_add_api_output_arguments(parser)
_add_list_format(parser)
-
def run(self, args: NominatimArgs) -> int:
formatter = napi.load_format_dispatcher('v1', args.project_dir)
try:
with napi.NominatimAPI(args.project_dir) as api:
results = api.lookup(places,
- address_details=True, # needed for display name
+ address_details=True, # needed for display name
geometry_output=_get_geometry_output(args),
geometry_simplification=args.polygon_threshold or 0.0,
locales=_get_locales(args, api.config.DEFAULT_LANGUAGE))
help='Preferred language order for presenting search results')
_add_list_format(parser)
-
def run(self, args: NominatimArgs) -> int:
formatter = napi.load_format_dispatcher('v1', args.project_dir)
place = napi.OsmID('W', args.way, args.object_class)
elif args.relation:
place = napi.OsmID('R', args.relation, args.object_class)
- elif args.place_id is not None:
+ elif args.place_id is not None:
place = napi.PlaceID(args.place_id)
else:
raise UsageError('One of the arguments --node/-n --way/-w '
linked_places=args.linkedplaces,
parented_places=args.hierarchy,
keywords=args.keywords,
- geometry_output=napi.GeometryFormat.GEOJSON
- if args.polygon_geojson
- else napi.GeometryFormat.NONE,
- locales=locales)
+ geometry_output=(napi.GeometryFormat.GEOJSON
+ if args.polygon_geojson
+ else napi.GeometryFormat.NONE),
+ locales=locales)
except napi.UsageError as ex:
raise UsageError(ex) from ex
help='Format of result (use --list-formats to see supported formats)')
_add_list_format(parser)
-
def run(self, args: NominatimArgs) -> int:
formatter = napi.load_format_dispatcher('v1', args.project_dir)
from ..config import Configuration
from ..typing import Protocol
+
LOG = logging.getLogger()
+
class Subcommand(Protocol):
"""
Interface to be implemented by classes implementing a CLI subcommand.
polygon_geojson: bool
group_hierarchy: bool
-
def osm2pgsql_options(self, default_cache: int,
default_threads: int) -> Dict[str, Any]:
""" Return the standard osm2pgsql options that can be derived
slim_index=self.config.TABLESPACE_OSM_INDEX,
main_data=self.config.TABLESPACE_PLACE_DATA,
main_index=self.config.TABLESPACE_PLACE_INDEX
- )
- )
-
+ )
+ )
def get_osm_file_list(self) -> Optional[List[Path]]:
""" Return the --osm-file argument as a list of Paths or None
from ..errors import UsageError
from .args import NominatimArgs
-# Do not repeat documentation of subcommand classes.
-# pylint: disable=C0111
-# Using non-top-level imports to avoid eventually unused imports.
-# pylint: disable=E0012,C0415
class WithAction(argparse.Action):
""" Special action that saves a list of flags, given on the command-line
super().__init__(full_option_strings, argparse.SUPPRESS, nargs=0, **kwargs)
-
def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace,
values: Union[str, Sequence[Any], None],
option_string: Optional[str] = None) -> None:
group.add_argument('--details', action=WithAction, dest_set=self.options, default=True,
help='Enable/disable support for details API (default: enabled)')
-
def run(self, args: NominatimArgs) -> int:
if args.output.exists():
raise UsageError(f"File '{args.output}' already exists. Refusing to overwrite.")
from nominatim_api.results import create_from_placex_row, ReverseResult, add_result_details
from nominatim_api.types import LookupDetails
-import sqlalchemy as sa # pylint: disable=C0411
+import sqlalchemy as sa
from ..errors import UsageError
from .args import NominatimArgs
-# Do not repeat documentation of subcommand classes.
-# pylint: disable=C0111
-# Using non-top-level imports to avoid eventually unused imports.
-# pylint: disable=E0012,C0415
-# Needed for SQLAlchemy
-# pylint: disable=singleton-comparison
LOG = logging.getLogger()
+
RANK_RANGE_MAP = {
'country': (4, 4),
'state': (5, 9),
'path': (27, 27)
}
+
RANK_TO_OUTPUT_MAP = {
4: 'country',
5: 'state', 6: 'state', 7: 'state', 8: 'state', 9: 'state',
17: 'suburb', 18: 'suburb', 19: 'suburb', 20: 'suburb', 21: 'suburb',
26: 'street', 27: 'path'}
+
class QueryExport:
"""\
Export places as CSV file from the database.
dest='relation',
help='Export only children of this OSM relation')
-
def run(self, args: NominatimArgs) -> int:
return asyncio.run(export(args))
t = conn.t.placex
sql = sa.select(t.c.place_id, t.c.parent_place_id,
- t.c.osm_type, t.c.osm_id, t.c.name,
- t.c.class_, t.c.type, t.c.admin_level,
- t.c.address, t.c.extratags,
- t.c.housenumber, t.c.postcode, t.c.country_code,
- t.c.importance, t.c.wikipedia, t.c.indexed_date,
- t.c.rank_address, t.c.rank_search,
- t.c.centroid)\
- .where(t.c.linked_place_id == None)\
- .where(t.c.rank_address.between(*output_range))
+ t.c.osm_type, t.c.osm_id, t.c.name,
+ t.c.class_, t.c.type, t.c.admin_level,
+ t.c.address, t.c.extratags,
+ t.c.housenumber, t.c.postcode, t.c.country_code,
+ t.c.importance, t.c.wikipedia, t.c.indexed_date,
+ t.c.rank_address, t.c.rank_search,
+ t.c.centroid)\
+ .where(t.c.linked_place_id == None)\
+ .where(t.c.rank_address.between(*output_range))
parent_place_id = await get_parent_id(conn, args.node, args.way, args.relation)
if parent_place_id:
await add_result_details(conn, results,
LookupDetails(address_details=True, locales=locale))
-
for result in results:
data = {'placeid': result.place_id,
'postcode': result.postcode}
from ..db.connection import connect
from .args import NominatimArgs
-# Do not repeat documentation of subcommand classes.
-# pylint: disable=C0111
-# Using non-top-level imports to avoid eventually unused imports.
-# pylint: disable=E0012,C0415
class SetupFreeze:
"""\
"""
def add_args(self, parser: argparse.ArgumentParser) -> None:
- pass # No options
-
+ pass # No options
def run(self, args: NominatimArgs) -> int:
from ..tools import freeze
from ..db.connection import connect
from .args import NominatimArgs
-# Do not repeat documentation of subcommand classes.
-# pylint: disable=C0111
-# Using non-top-level imports to avoid eventually unused imports.
-# pylint: disable=E0012,C0415
-
class UpdateIndex:
"""\
group.add_argument('--maxrank', '-R', type=int, metavar='RANK', default=30,
help='Maximum/finishing rank')
-
def run(self, args: NominatimArgs) -> int:
asyncio.run(self._do_index(args))
return 0
-
async def _do_index(self, args: NominatimArgs) -> None:
from ..tokenizer import factory as tokenizer_factory
indexer = Indexer(args.config.get_libpq_dsn(), tokenizer,
args.threads or psutil.cpu_count() or 1)
- has_pending = True # run at least once
+ has_pending = True # run at least once
while has_pending:
if not args.no_boundaries:
await indexer.index_boundaries(args.minrank, args.maxrank)
from ..tokenizer.base import AbstractTokenizer
from .args import NominatimArgs
-# Do not repeat documentation of subcommand classes.
-# pylint: disable=C0111
-# Using non-top-level imports to avoid eventually unused imports.
-# pylint: disable=E0012,C0415
LOG = logging.getLogger()
+
def _parse_osm_object(obj: str) -> Tuple[str, int]:
""" Parse the given argument into a tuple of OSM type and ID.
Raises an ArgumentError if the format is not recognized.
group.add_argument('--enable-debug-statements', action='store_true',
help='Enable debug warning statements in functions')
-
- def run(self, args: NominatimArgs) -> int: #pylint: disable=too-many-branches, too-many-statements
+ def run(self, args: NominatimArgs) -> int:
from ..tools import refresh, postcodes
from ..indexer.indexer import Indexer
LOG.warning('Import secondary importance raster data from %s', args.project_dir)
if refresh.import_secondary_importance(args.config.get_libpq_dsn(),
- args.project_dir) > 0:
+ args.project_dir) > 0:
LOG.fatal('FATAL: Cannot update secondary importance raster data')
return 1
need_function_refresh = True
return 0
-
def _get_tokenizer(self, config: Configuration) -> AbstractTokenizer:
if self.tokenizer is None:
from ..tokenizer import factory as tokenizer_factory
LOG = logging.getLogger()
-# Do not repeat documentation of subcommand classes.
-# pylint: disable=C0111
-# Using non-top-level imports to make pyosmium optional for replication only.
-# pylint: disable=C0415
class UpdateReplication:
"""\
group.add_argument('--socket-timeout', dest='socket_timeout', type=int, default=60,
help='Set timeout for file downloads')
-
def _init_replication(self, args: NominatimArgs) -> int:
from ..tools import replication, refresh
refresh.create_functions(conn, args.config, True, False)
return 0
-
def _check_for_updates(self, args: NominatimArgs) -> int:
from ..tools import replication
return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL,
socket_timeout=args.socket_timeout)
-
def _report_update(self, batchdate: dt.datetime,
start_import: dt.datetime,
start_index: Optional[dt.datetime]) -> None:
round_time(end - start_import),
round_time(end - batchdate))
-
def _compute_update_interval(self, args: NominatimArgs) -> int:
if args.catch_up:
return 0
return update_interval
-
async def _update(self, args: NominatimArgs) -> None:
# pylint: disable=too-many-locals
from ..tools import replication
LOG.warning("No new changes. Sleeping for %d sec.", recheck_interval)
time.sleep(recheck_interval)
-
def run(self, args: NominatimArgs) -> int:
socket.setdefaulttimeout(args.socket_timeout)
from ..version import NOMINATIM_VERSION
from .args import NominatimArgs
-# Do not repeat documentation of subcommand classes.
-# pylint: disable=C0111
-# Using non-top-level imports to avoid eventually unused imports.
-# pylint: disable=C0415
LOG = logging.getLogger()
+
class SetupAll:
"""\
Create a new Nominatim database from an OSM file.
def add_args(self, parser: argparse.ArgumentParser) -> None:
group1 = parser.add_argument_group('Required arguments')
group1.add_argument('--osm-file', metavar='FILE', action='append',
- help='OSM file to be imported'
- ' (repeat for importing multiple files)',
- default=None)
+ help='OSM file to be imported'
+ ' (repeat for importing multiple files)',
+ default=None)
group1.add_argument('--continue', dest='continue_at',
- choices=['import-from-file', 'load-data', 'indexing', 'db-postprocess'],
- help='Continue an import that was interrupted',
- default=None)
+ choices=['import-from-file', 'load-data', 'indexing', 'db-postprocess'],
+ help='Continue an import that was interrupted',
+ default=None)
group2 = parser.add_argument_group('Optional arguments')
group2.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int,
- help='Size of cache to be used by osm2pgsql (in MB)')
+ help='Size of cache to be used by osm2pgsql (in MB)')
group2.add_argument('--reverse-only', action='store_true',
- help='Do not create tables and indexes for searching')
+ help='Do not create tables and indexes for searching')
group2.add_argument('--no-partitions', action='store_true',
- help=("Do not partition search indices "
- "(speeds up import of single country extracts)"))
+ help="Do not partition search indices "
+ "(speeds up import of single country extracts)")
group2.add_argument('--no-updates', action='store_true',
- help="Do not keep tables that are only needed for "
- "updating the database later")
+ help="Do not keep tables that are only needed for "
+ "updating the database later")
group2.add_argument('--offline', action='store_true',
help="Do not attempt to load any additional data from the internet")
group3 = parser.add_argument_group('Expert options')
group3.add_argument('--ignore-errors', action='store_true',
- help='Continue import even when errors in SQL are present')
+ help='Continue import even when errors in SQL are present')
group3.add_argument('--index-noanalyse', action='store_true',
- help='Do not perform analyse operations during index (expert only)')
+ help='Do not perform analyse operations during index (expert only)')
group3.add_argument('--prepare-database', action='store_true',
help='Create the database but do not import any data')
-
- def run(self, args: NominatimArgs) -> int: # pylint: disable=too-many-statements, too-many-branches
+ def run(self, args: NominatimArgs) -> int:
if args.osm_file is None and args.continue_at is None and not args.prepare_database:
raise UsageError("No input files (use --osm-file).")
return asyncio.run(self.async_run(args))
-
async def async_run(self, args: NominatimArgs) -> int:
from ..data import country_info
from ..tools import database_import, postcodes, freeze
if args.prepare_database or args.continue_at is None:
LOG.warning('Creating database')
database_import.setup_database_skeleton(args.config.get_libpq_dsn(),
- rouser=args.config.DATABASE_WEBUSER)
+ rouser=args.config.DATABASE_WEBUSER)
if args.prepare_database:
return 0
postcodes.update_postcodes(args.config.get_libpq_dsn(),
args.project_dir, tokenizer)
- if args.continue_at in \
- ('import-from-file', 'load-data', 'indexing', None):
+ if args.continue_at in ('import-from-file', 'load-data', 'indexing', None):
LOG.warning('Indexing places')
indexer = Indexer(args.config.get_libpq_dsn(), tokenizer, num_threads)
await indexer.index_full(analyse=not args.index_noanalyse)
return 0
-
def _base_import(self, args: NominatimArgs) -> None:
from ..tools import database_import, refresh
from ..data import country_info
database_import.check_existing_database_plugins(args.config.get_libpq_dsn())
LOG.warning('Setting up country tables')
country_info.setup_country_tables(args.config.get_libpq_dsn(),
- args.config.lib_dir.data,
- args.no_partitions)
+ args.config.lib_dir.data,
+ args.no_partitions)
LOG.warning('Importing OSM data file')
database_import.import_osm_data(files,
LOG.warning('Importing wikipedia importance data')
data_path = Path(args.config.WIKIPEDIA_DATA_PATH or args.project_dir)
if refresh.import_wikipedia_articles(args.config.get_libpq_dsn(),
- data_path) > 0:
+ data_path) > 0:
LOG.error('Wikipedia importance dump file not found. '
- 'Calculating importance values of locations will not '
- 'use Wikipedia importance data.')
+ 'Calculating importance values of locations will not '
+ 'use Wikipedia importance data.')
LOG.warning('Importing secondary importance raster data')
if refresh.import_secondary_importance(args.config.get_libpq_dsn(),
- args.project_dir) != 0:
+ args.project_dir) != 0:
LOG.error('Secondary importance file not imported. '
- 'Falling back to default ranking.')
+ 'Falling back to default ranking.')
self._setup_tables(args.config, args.reverse_only)
-
def _setup_tables(self, config: Configuration, reverse_only: bool) -> None:
""" Set up the basic database layout: tables, indexes and functions.
"""
LOG.warning('Create functions (3rd pass)')
refresh.create_functions(conn, config, False, False)
-
def _get_tokenizer(self, continue_at: Optional[str],
config: Configuration) -> AbstractTokenizer:
""" Set up a new tokenizer or load an already initialised one.
# just load the tokenizer
return tokenizer_factory.get_tokenizer_for_db(config)
-
def _finalize_database(self, dsn: str, offline: bool) -> None:
""" Determine the database date and set the status accordingly.
"""
dbdate = status.compute_database_date(conn, offline)
status.set_status(conn, dbdate)
LOG.info('Database is at %s.', dbdate)
- except Exception as exc: # pylint: disable=broad-except
+ except Exception as exc:
LOG.error('Cannot determine date of database: %s', exc)
from ..tools.special_phrases.sp_csv_loader import SPCsvLoader
from .args import NominatimArgs
+
LOG = logging.getLogger()
-# Do not repeat documentation of subcommand classes.
-# pylint: disable=C0111
-# Using non-top-level imports to avoid eventually unused imports.
-# pylint: disable=E0012,C0415
class ImportSpecialPhrases:
"""\
group.add_argument('--no-replace', action='store_true',
help='Keep the old phrases and only add the new ones')
-
def run(self, args: NominatimArgs) -> int:
if args.import_from_wiki:
return 0
-
def start_import(self, args: NominatimArgs, loader: SpecialPhraseLoader) -> None:
"""
Create the SPImporter object containing the right
from . import paths
LOG = logging.getLogger()
-CONFIG_CACHE : Dict[str, Any] = {}
+CONFIG_CACHE: Dict[str, Any] = {}
+
def flatten_config_list(content: Any, section: str = '') -> List[Any]:
""" Flatten YAML configuration lists that contain include sections
self.lib_dir = _LibDirs()
self._private_plugins: Dict[str, object] = {}
-
def set_libdirs(self, **kwargs: StrPath) -> None:
""" Set paths to library functions and data.
"""
for key, value in kwargs.items():
setattr(self.lib_dir, key, None if value is None else Path(value))
-
def __getattr__(self, name: str) -> str:
name = 'NOMINATIM_' + name
return self._config[name] or ''
-
def get_bool(self, name: str) -> bool:
""" Return the given configuration parameter as a boolean.
"""
return getattr(self, name).lower() in ('1', 'yes', 'true')
-
def get_int(self, name: str) -> int:
""" Return the given configuration parameter as an int.
LOG.fatal("Invalid setting NOMINATIM_%s. Needs to be a number.", name)
raise UsageError("Configuration error.") from exp
-
def get_str_list(self, name: str) -> Optional[List[str]]:
""" Return the given configuration parameter as a list of strings.
The values are assumed to be given as a comma-sparated list and
- will be stripped before returning them.
+ will be stripped before returning them.
Parameters:
name: Name of the configuration parameter with the NOMINATIM_
return [v.strip() for v in raw.split(',')] if raw else None
-
def get_path(self, name: str) -> Optional[Path]:
""" Return the given configuration parameter as a Path.
return cfgpath.resolve()
-
def get_libpq_dsn(self) -> str:
""" Get configured database DSN converted into the key/value format
understood by libpq and psycopg.
return dsn
-
def get_database_params(self) -> Mapping[str, Union[str, int, None]]:
""" Get the configured parameters for the database connection
as a mapping.
return conninfo_to_dict(dsn)
-
def get_import_style_file(self) -> Path:
""" Return the import style file as a path object. Translates the
name of the standard styles automatically into a file in the
return self.find_config_file('', 'IMPORT_STYLE')
-
def get_os_env(self) -> Dict[str, str]:
""" Return a copy of the OS environment with the Nominatim configuration
merged in.
return env
-
def load_sub_configuration(self, filename: StrPath,
config: Optional[str] = None) -> Any:
""" Load additional configuration from a file. `filename` is the name
CONFIG_CACHE[str(configfile)] = result
return result
-
def load_plugin_module(self, module_name: str, internal_path: str) -> Any:
""" Load a Python module as a plugin.
return sys.modules.get(module_name) or importlib.import_module(module_name)
-
def find_config_file(self, filename: StrPath,
config: Optional[str] = None) -> Path:
""" Resolve the location of a configuration file given a filename and
filename = cfg_filename
-
search_paths = [self.project_dir, self.config_dir]
for path in search_paths:
if path is not None and (path / filename).is_file():
filename, search_paths)
raise UsageError("Config file not found.")
-
def _load_from_yaml(self, cfgfile: Path) -> Any:
""" Load a YAML configuration file. This installs a special handler that
allows to include other YAML files using the '!include' operator.
Loader=yaml.SafeLoader)
return yaml.safe_load(cfgfile.read_text(encoding='utf-8'))
-
def _yaml_include_representer(self, loader: Any, node: yaml.Node) -> Any:
""" Handler for the '!include' operator in YAML files.
from ..config import Configuration
from ..tokenizer.base import AbstractTokenizer
+
def _flatten_name_list(names: Any) -> Dict[str, str]:
if names is None:
return {}
return flat
-
class _CountryInfo:
""" Caches country-specific properties from the configuration file.
"""
def __init__(self) -> None:
self._info: Dict[str, Dict[str, Any]] = {}
-
def load(self, config: Configuration) -> None:
""" Load the country properties from the configuration files,
if they are not loaded yet.
for x in prop['languages'].split(',')]
prop['names'] = _flatten_name_list(prop.get('names'))
-
def items(self) -> Iterable[Tuple[str, Dict[str, Any]]]:
""" Return tuples of (country_code, property dict) as iterable.
"""
return self._info.get(country_code, {})
-
_COUNTRY_INFO = _CountryInfo()
"""
_COUNTRY_INFO.load(config)
+
@overload
def iterate() -> Iterable[Tuple[str, Dict[str, Any]]]:
...
+
@overload
def iterate(prop: str) -> Iterable[Tuple[str, Any]]:
...
+
def iterate(prop: Optional[str] = None) -> Iterable[Tuple[str, Dict[str, Any]]]:
""" Iterate over country code and properties.
# country names (only in languages as provided)
if name:
- names.update({k : v for k, v in name.items() if _include_key(k)})
+ names.update({k: v for k, v in name.items() if _include_key(k)})
analyzer.add_country_names(code, names)
"""
from typing import Optional, Mapping, Any, Tuple
+
class PlaceInfo:
""" This data class contains all information the tokenizer can access
about a place.
def __init__(self, info: Mapping[str, Any]) -> None:
self._info = info
-
@property
def name(self) -> Optional[Mapping[str, str]]:
""" A dictionary with the names of the place. Keys and values represent
"""
return self._info.get('name')
-
@property
def address(self) -> Optional[Mapping[str, str]]:
""" A dictionary with the address elements of the place. They key
"""
return self._info.get('address')
-
@property
def country_code(self) -> Optional[str]:
""" The country code of the country the place is in. Guaranteed
"""
return self._info.get('country_code')
-
@property
def rank_address(self) -> int:
""" The [rank address][1] before any rank correction is applied.
"""
return self._info.get('rank_address', 0)
-
@property
def centroid(self) -> Optional[Tuple[float, float]]:
""" A center point of the place in WGS84. May be None when the
x, y = self._info.get('centroid_x'), self._info.get('centroid_y')
return None if x is None or y is None else (x, y)
-
def is_a(self, key: str, value: str) -> bool:
""" Set to True when the place's primary tag corresponds to the given
key and value.
"""
return self._info.get('class') == key and self._info.get('type') == value
-
def is_country(self) -> bool:
""" Set to True when the place is a valid country boundary.
"""
return self.rank_address == 4 \
- and self.is_a('boundary', 'administrative') \
- and self.country_code is not None
+ and self.is_a('boundary', 'administrative') \
+ and self.country_code is not None
"""
from typing import Optional, Dict, Mapping
+
class PlaceName:
""" Each name and address part of a place is encapsulated in an object of
this class. It saves not only the name proper but also describes the
self.suffix = suffix
self.attr: Dict[str, str] = {}
-
def __repr__(self) -> str:
return f"PlaceName(name={self.name!r},kind={self.kind!r},suffix={self.suffix!r})"
-
def clone(self, name: Optional[str] = None,
kind: Optional[str] = None,
suffix: Optional[str] = None,
return newobj
-
def set_attr(self, key: str, value: str) -> None:
""" Add the given property to the name. If the property was already
set, then the value is overwritten.
"""
self.attr[key] = value
-
def get_attr(self, key: str, default: Optional[str] = None) -> Optional[str]:
""" Return the given property or the value of 'default' if it
is not set.
"""
return self.attr.get(key, default)
-
def has_attr(self, key: str) -> bool:
""" Check if the given attribute is set.
"""
from ..errors import UsageError
from . import country_info
+
class CountryPostcodeMatcher:
""" Matches and formats a postcode according to a format definition
of the given country.
self.output = config.get('output', r'\g<0>')
-
def match(self, postcode: str) -> Optional[Match[str]]:
""" Match the given postcode against the postcode pattern for this
matcher. Returns a `re.Match` object if the match was successful
return None
-
def normalize(self, match: Match[str]) -> str:
""" Return the default format of the postcode for the given match.
`match` must be a `re.Match` object previously returned by
else:
raise UsageError(f"Invalid entry 'postcode' for country '{ccode}'")
-
def set_default_pattern(self, pattern: str) -> None:
""" Set the postcode match pattern to use, when a country does not
have a specific pattern.
"""
self.default_matcher = CountryPostcodeMatcher('', {'pattern': pattern})
-
def get_matcher(self, country_code: Optional[str]) -> Optional[CountryPostcodeMatcher]:
""" Return the CountryPostcodeMatcher for the given country.
Returns None if the country doesn't have a postcode and the
return self.country_matcher.get(country_code, self.default_matcher)
-
def match(self, country_code: Optional[str], postcode: str) -> Optional[Match[str]]:
""" Match the given postcode against the postcode pattern for this
matcher. Returns a `re.Match` object if the country has a pattern
return self.country_matcher.get(country_code, self.default_matcher).match(postcode)
-
def normalize(self, country_code: str, match: Match[str]) -> str:
""" Return the default format of the postcode for the given match.
`match` must be a `re.Match` object previously returned by
Cursor = psycopg.Cursor[Any]
Connection = psycopg.Connection[Any]
+
def execute_scalar(conn: Connection, sql: psycopg.abc.Query, args: Any = None) -> Any:
""" Execute query that returns a single value. The value is returned.
If the query yields more than one row, a ValueError is raised.
def table_exists(conn: Connection, table: str) -> bool:
""" Check that a table with the given name exists in the database.
"""
- num = execute_scalar(conn,
- """SELECT count(*) FROM pg_tables
- WHERE tablename = %s and schemaname = 'public'""", (table, ))
+ num = execute_scalar(
+ conn,
+ """SELECT count(*) FROM pg_tables
+ WHERE tablename = %s and schemaname = 'public'""", (table, ))
return num == 1 if isinstance(num, int) else False
""" Check if the table 'table' exists and has a column with name 'column'.
"""
has_column = execute_scalar(conn,
- """SELECT count(*) FROM information_schema.columns
- WHERE table_name = %s and column_name = %s""",
- (table, column))
+ """SELECT count(*) FROM information_schema.columns
+ WHERE table_name = %s and column_name = %s""",
+ (table, column))
return has_column > 0 if isinstance(has_column, int) else False
return True
+
def drop_tables(conn: Connection, *names: str,
- if_exists: bool = True, cascade: bool = False) -> None:
+ if_exists: bool = True, cascade: bool = False) -> None:
""" Drop one or more tables with the given names.
Set `if_exists` to False if a non-existent table should raise
an exception instead of just being ignored. `cascade` will cause
from .connection import Connection, table_exists
+
def set_property(conn: Connection, name: str, value: str) -> None:
""" Add or replace the property with the given name.
"""
QueueItem = Optional[Tuple[psycopg.abc.Query, Any]]
+
class QueryPool:
""" Pool to run SQL queries in parallel asynchronous execution.
self.pool = [asyncio.create_task(self._worker_loop(dsn, **conn_args))
for _ in range(pool_size)]
-
async def put_query(self, query: psycopg.abc.Query, params: Any) -> None:
""" Schedule a query for execution.
"""
self.wait_time += time.time() - tstart
await asyncio.sleep(0)
-
async def finish(self) -> None:
""" Wait for all queries to finish and close the pool.
"""
if excp is not None:
raise excp
-
async def _worker_loop(self, dsn: str, **conn_args: Any) -> None:
conn_args['autocommit'] = True
aconn = await psycopg.AsyncConnection.connect(dsn, **conn_args)
str(item[0]), str(item[1]))
# item is still valid here, causing a retry
-
async def __aenter__(self) -> 'QueryPool':
return self
-
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
await self.finish()
from ..config import Configuration
from ..db.query_pool import QueryPool
+
def _get_partitions(conn: Connection) -> Set[int]:
""" Get the set of partitions currently in use.
"""
return set((row[0] for row in list(cur)))
+
def _get_middle_db_format(conn: Connection, tables: Set[str]) -> str:
""" Returns the version of the slim middle tables.
"""
ps3 = postgis_version >= (3, 0)
return {
'has_index_non_key_column': pg11plus,
- 'spgist_geom' : 'SPGIST' if pg11plus and ps3 else 'GIST'
+ 'spgist_geom': 'SPGIST' if pg11plus and ps3 else 'GIST'
}
+
class SQLPreprocessor:
""" A environment for preprocessing SQL files from the
lib-sql directory.
self.env.globals['db'] = db_info
self.env.globals['postgres'] = _setup_postgresql_features(conn)
-
def run_string(self, conn: Connection, template: str, **kwargs: Any) -> None:
""" Execute the given SQL template string on the connection.
The keyword arguments may supply additional parameters
cur.execute(sql)
conn.commit()
-
def run_sql_file(self, conn: Connection, name: str, **kwargs: Any) -> None:
""" Execute the given SQL file on the connection. The keyword arguments
may supply additional parameters for preprocessing.
cur.execute(sql)
conn.commit()
-
async def run_parallel_sql_file(self, dsn: str, name: str, num_threads: int = 1,
**kwargs: Any) -> None:
""" Execute the given SQL files using parallel asynchronous connections.
LOG = logging.getLogger()
+
def _pipe_to_proc(proc: 'subprocess.Popen[bytes]',
fdesc: Union[IO[bytes], gzip.GzipFile]) -> int:
assert proc.stdin is not None
return len(chunk)
+
def execute_file(dsn: str, fname: Path,
ignore_errors: bool = False,
pre_code: Optional[str] = None,
Custom exception and error classes for Nominatim.
"""
+
class UsageError(Exception):
""" An error raised because of bad user input. This error will usually
not cause a stack trace to be printed unless debugging is enabled.
LOG = logging.getLogger()
+
class Indexer:
""" Main indexing routine.
"""
self.tokenizer = tokenizer
self.num_threads = num_threads
-
def has_pending(self) -> bool:
""" Check if any data still needs indexing.
This function must only be used after the import has finished.
cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
return cur.rowcount > 0
-
async def index_full(self, analyse: bool = True) -> None:
""" Index the complete database. This will first index boundaries
followed by all other objects. When `analyse` is True, then the
if not self.has_pending():
break
-
async def index_boundaries(self, minrank: int, maxrank: int) -> int:
""" Index only administrative boundaries within the given rank range.
"""
(minrank, maxrank))
total_tuples = {row.rank_address: row.count for row in cur}
-
with self.tokenizer.name_analyzer() as analyzer:
for rank in range(max(1, minrank), maxrank + 1):
if rank >= 30:
return total
-
async def index_postcodes(self) -> int:
"""Index the entries of the location_postcode table.
"""
return await self._index(runners.PostcodeRunner(), batch=20)
-
def update_status_table(self) -> None:
""" Update the status in the status table to 'indexed'.
"""
if total_tuples > 0:
async with await psycopg.AsyncConnection.connect(
- self.dsn, row_factory=psycopg.rows.dict_row) as aconn,\
+ self.dsn, row_factory=psycopg.rows.dict_row) as aconn, \
QueryPool(self.dsn, self.num_threads, autocommit=True) as pool:
fetcher_time = 0.0
tstart = time.time()
return progress.done()
-
def _prepare_indexing(self, runner: runners.Runner) -> int:
with connect(self.dsn) as conn:
hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
INITIAL_PROGRESS = 10
+
class ProgressLogger:
""" Tracks and prints progress for the indexing process.
`name` is the name of the indexing step being tracked.
from ..data.place_info import PlaceInfo
from ..tokenizer.base import AbstractAnalyzer
-# pylint: disable=C0111
def _mk_valuelist(template: str, num: int) -> pysql.Composed:
return pysql.SQL(',').join([pysql.SQL(template)] * num)
+
def _analyze_place(place: DictRow, analyzer: AbstractAnalyzer) -> Json:
return Json(analyzer.process_place(PlaceInfo(place)))
LATERAL placex_indexing_prepare(px) as extra """)
UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)"
+
class AbstractPlacexRunner:
""" Returns SQL commands for indexing of the placex table.
"""
self.rank = rank
self.analyzer = analyzer
-
def index_places_query(self, batch_size: int) -> Query:
return pysql.SQL(
""" UPDATE placex
WHERE place_id = v.id
""").format(_mk_valuelist(UPDATE_LINE, batch_size))
-
def index_places_params(self, place: DictRow) -> Sequence[Any]:
return (place['place_id'],
place['name'],
def __init__(self, analyzer: AbstractAnalyzer) -> None:
self.analyzer = analyzer
-
def name(self) -> str:
return "interpolation lines (location_property_osmline)"
return """SELECT count(*) FROM location_property_osmline
WHERE indexed_status > 0"""
-
def sql_get_objects(self) -> Query:
return """SELECT place_id, get_interpolation_address(address, osm_id) as address
FROM location_property_osmline
WHERE indexed_status > 0
ORDER BY geometry_sector"""
-
def index_places_query(self, batch_size: int) -> Query:
return pysql.SQL("""UPDATE location_property_osmline
SET indexed_status = 0, address = v.addr, token_info = v.ti
WHERE place_id = v.id
""").format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", batch_size))
-
def index_places_params(self, place: DictRow) -> Sequence[Any]:
return (place['place_id'], place['address'],
_analyze_place(place, self.analyzer))
-
class PostcodeRunner(Runner):
""" Provides the SQL commands for indexing the location_postcode table.
"""
def name(self) -> str:
return "postcodes (location_postcode)"
-
def sql_count_objects(self) -> Query:
return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
-
def sql_get_objects(self) -> Query:
return """SELECT place_id FROM location_postcode
WHERE indexed_status > 0
ORDER BY country_code, postcode"""
-
def index_places_query(self, batch_size: int) -> Query:
return pysql.SQL("""UPDATE location_postcode SET indexed_status = 0
WHERE place_id IN ({})""")\
.format(pysql.SQL(',').join((pysql.Placeholder() for _ in range(batch_size))))
-
def index_places_params(self, place: DictRow) -> Sequence[Any]:
return (place['place_id'], )
from ..db.connection import Connection
from ..data.place_info import PlaceInfo
+
class AbstractAnalyzer(ABC):
""" The analyzer provides the functions for analysing names and building
the token database.
def __enter__(self) -> 'AbstractAnalyzer':
return self
-
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
self.close()
-
@abstractmethod
def close(self) -> None:
""" Free all resources used by the analyzer.
"""
-
@abstractmethod
def get_word_token_info(self, words: List[str]) -> List[Tuple[str, str, int]]:
""" Return token information for the given list of words.
(original word, word token, word id).
"""
-
@abstractmethod
def normalize_postcode(self, postcode: str) -> str:
""" Convert the postcode to its standardized form.
The given postcode after normalization.
"""
-
@abstractmethod
def update_postcodes_from_db(self) -> None:
""" Update the tokenizer's postcode tokens from the current content
of the `location_postcode` table.
"""
-
@abstractmethod
def update_special_phrases(self,
phrases: Iterable[Tuple[str, str, str, str]],
ones that already exist.
"""
-
@abstractmethod
def add_country_names(self, country_code: str, names: Dict[str, str]) -> None:
""" Add the given names to the tokenizer's list of country tokens.
names: Dictionary of name type to name.
"""
-
@abstractmethod
def process_place(self, place: PlaceInfo) -> Any:
""" Extract tokens for the given place and compute the
"""
-
class AbstractTokenizer(ABC):
""" The tokenizer instance is the central instance of the tokenizer in
the system. There will only be a single instance of the tokenizer
tokenizers.
"""
-
@abstractmethod
def init_from_project(self, config: Configuration) -> None:
""" Initialise the tokenizer from an existing database setup.
config: Read-only object with configuration options.
"""
-
@abstractmethod
def finalize_import(self, config: Configuration) -> None:
""" This function is called at the very end of an import when all
config: Read-only object with configuration options.
"""
-
@abstractmethod
def update_sql_functions(self, config: Configuration) -> None:
""" Update the SQL part of the tokenizer. This function is called
config: Read-only object with configuration options.
"""
-
@abstractmethod
def check_database(self, config: Configuration) -> Optional[str]:
""" Check that the database is set up correctly and ready for being
how to resolve the issue. If everything is okay, return `None`.
"""
-
@abstractmethod
def update_statistics(self, config: Configuration, threads: int = 1) -> None:
""" Recompute any tokenizer statistics necessary for efficient lookup.
it to be called in order to work.
"""
-
@abstractmethod
def update_word_tokens(self) -> None:
""" Do house-keeping on the tokenizers internal data structures.
Remove unused word tokens, resort data etc.
"""
-
@abstractmethod
def name_analyzer(self) -> AbstractAnalyzer:
""" Create a new analyzer for tokenizing names and queries
call the close() function before destructing the analyzer.
"""
-
@abstractmethod
def most_frequent_words(self, conn: Connection, num: int) -> List[str]:
""" Return a list of the most frequent full words in the database.
LOG = logging.getLogger()
+
def _import_tokenizer(name: str) -> TokenizerModule:
""" Load the tokenizer.py module from project directory.
"""
# Load optional sanitizer rule set.
self.sanitizer_rules = rules.get('sanitizers', [])
-
def load_config_from_db(self, conn: Connection) -> None:
""" Get previously saved parts of the configuration from the
database.
self.analysis_rules = []
self._setup_analysis()
-
def save_config_to_db(self, conn: Connection) -> None:
""" Save the part of the configuration that cannot be changed into
the database.
set_property(conn, DBCFG_IMPORT_TRANS_RULES, self.transliteration_rules)
set_property(conn, DBCFG_IMPORT_ANALYSIS_RULES, json.dumps(self.analysis_rules))
-
def make_sanitizer(self) -> PlaceSanitizer:
""" Create a place sanitizer from the configured rules.
"""
return PlaceSanitizer(self.sanitizer_rules, self.config)
-
def make_token_analysis(self) -> ICUTokenAnalysis:
""" Create a token analyser from the reviouly loaded rules.
"""
return ICUTokenAnalysis(self.normalization_rules,
self.transliteration_rules, self.analysis)
-
def get_search_rules(self) -> str:
""" Return the ICU rules to be used during search.
The rules combine normalization and transliteration.
rules.write(self.transliteration_rules)
return rules.getvalue()
-
def get_normalization_rules(self) -> str:
""" Return rules for normalisation of a term.
"""
return self.normalization_rules
-
def get_transliteration_rules(self) -> str:
""" Return the rules for converting a string into its asciii representation.
"""
return self.transliteration_rules
-
def _setup_analysis(self) -> None:
""" Process the rules used for creating the various token analyzers.
"""
- self.analysis: Dict[Optional[str], TokenAnalyzerRule] = {}
+ self.analysis: Dict[Optional[str], TokenAnalyzerRule] = {}
if not isinstance(self.analysis_rules, list):
raise UsageError("Configuration section 'token-analysis' must be a list.")
norm = Transliterator.createFromRules("rule_loader_normalization",
self.normalization_rules)
trans = Transliterator.createFromRules("rule_loader_transliteration",
- self.transliteration_rules)
+ self.transliteration_rules)
for section in self.analysis_rules:
name = section.get('id', None)
self.analysis[name] = TokenAnalyzerRule(section, norm, trans,
self.config)
-
@staticmethod
def _cfg_to_icu_rules(rules: Mapping[str, Any], section: str) -> str:
""" Load an ICU ruleset from the given section. If the section is a
self.config = self._analysis_mod.configure(rules, normalizer,
transliterator)
-
def create(self, normalizer: Any, transliterator: Any) -> Analyzer:
""" Create a new analyser instance for the given rule.
"""
from .token_analysis.base import Analyzer
if TYPE_CHECKING:
- from typing import Any
- from .icu_rule_loader import TokenAnalyzerRule # pylint: disable=cyclic-import
+ from typing import Any # noqa
+ from .icu_rule_loader import TokenAnalyzerRule
+
class ICUTokenAnalysis:
""" Container class collecting the transliterators and token analysis
self.analysis = {name: arules.create(self.normalizer, self.to_ascii)
for name, arules in analysis_rules.items()}
-
def get_analyzer(self, name: Optional[str]) -> Analyzer:
""" Return the given named analyzer. If no analyzer with that
name exists, return the default analyzer.
from psycopg.types.json import Jsonb
from psycopg import sql as pysql
-from ..db.connection import connect, Connection, Cursor, server_version_tuple,\
+from ..db.connection import connect, Connection, Cursor, server_version_tuple, \
drop_tables, table_exists, execute_scalar
from ..config import Configuration
from ..db.sql_preprocessor import SQLPreprocessor
LOG = logging.getLogger()
-WORD_TYPES =(('country_names', 'C'),
- ('postcodes', 'P'),
- ('full_word', 'W'),
- ('housenumbers', 'H'))
+WORD_TYPES = (('country_names', 'C'),
+ ('postcodes', 'P'),
+ ('full_word', 'W'),
+ ('housenumbers', 'H'))
+
def create(dsn: str, data_dir: Path) -> 'ICUTokenizer':
""" Create a new instance of the tokenizer provided by this module.
self.data_dir = data_dir
self.loader: Optional[ICURuleLoader] = None
-
def init_new_db(self, config: Configuration, init_db: bool = True) -> None:
""" Set up a new tokenizer for the database.
self._setup_db_tables(config)
self._create_base_indices(config, 'word')
-
def init_from_project(self, config: Configuration) -> None:
""" Initialise the tokenizer from the project directory.
"""
with connect(self.dsn) as conn:
self.loader.load_config_from_db(conn)
-
def finalize_import(self, config: Configuration) -> None:
""" Do any required postprocessing to make the tokenizer data ready
for use.
"""
self._create_lookup_indices(config, 'word')
-
def update_sql_functions(self, config: Configuration) -> None:
""" Reimport the SQL functions for this tokenizer.
"""
sqlp = SQLPreprocessor(conn, config)
sqlp.run_sql_file(conn, 'tokenizer/icu_tokenizer.sql')
-
def check_database(self, config: Configuration) -> None:
""" Check that the tokenizer is set up correctly.
"""
# Will throw an error if there is an issue.
self.init_from_project(config)
-
def update_statistics(self, config: Configuration, threads: int = 2) -> None:
""" Recompute frequencies for all name words.
"""
SELECT unnest(nameaddress_vector) as id, count(*)
FROM search_name GROUP BY id""")
cur.execute('CREATE INDEX ON addressword_frequencies(id)')
- cur.execute("""CREATE OR REPLACE FUNCTION word_freq_update(wid INTEGER,
- INOUT info JSONB)
- AS $$
- DECLARE rec RECORD;
- BEGIN
- IF info is null THEN
- info = '{}'::jsonb;
- END IF;
- FOR rec IN SELECT count FROM word_frequencies WHERE id = wid
- LOOP
- info = info || jsonb_build_object('count', rec.count);
- END LOOP;
- FOR rec IN SELECT count FROM addressword_frequencies WHERE id = wid
- LOOP
- info = info || jsonb_build_object('addr_count', rec.count);
- END LOOP;
- IF info = '{}'::jsonb THEN
- info = null;
- END IF;
- END;
- $$ LANGUAGE plpgsql IMMUTABLE;
- """)
+ cur.execute("""
+ CREATE OR REPLACE FUNCTION word_freq_update(wid INTEGER,
+ INOUT info JSONB)
+ AS $$
+ DECLARE rec RECORD;
+ BEGIN
+ IF info is null THEN
+ info = '{}'::jsonb;
+ END IF;
+ FOR rec IN SELECT count FROM word_frequencies WHERE id = wid
+ LOOP
+ info = info || jsonb_build_object('count', rec.count);
+ END LOOP;
+ FOR rec IN SELECT count FROM addressword_frequencies WHERE id = wid
+ LOOP
+ info = info || jsonb_build_object('addr_count', rec.count);
+ END LOOP;
+ IF info = '{}'::jsonb THEN
+ info = null;
+ END IF;
+ END;
+ $$ LANGUAGE plpgsql IMMUTABLE;
+ """)
LOG.info('Update word table with recomputed frequencies')
drop_tables(conn, 'tmp_word')
cur.execute("""CREATE TABLE tmp_word AS
self._create_lookup_indices(config, 'tmp_word')
self._move_temporary_word_table('tmp_word')
-
-
def _cleanup_housenumbers(self) -> None:
""" Remove unused house numbers.
"""
(list(candidates.values()), ))
conn.commit()
-
-
def update_word_tokens(self) -> None:
""" Remove unused tokens.
"""
self._cleanup_housenumbers()
LOG.warning("Tokenizer house-keeping done.")
-
def name_analyzer(self) -> 'ICUNameAnalyzer':
""" Create a new analyzer for tokenizing names and queries
using this tokinzer. Analyzers are context managers and should
return ICUNameAnalyzer(self.dsn, self.loader.make_sanitizer(),
self.loader.make_token_analysis())
-
def most_frequent_words(self, conn: Connection, num: int) -> List[str]:
""" Return a list of the `num` most frequent full words
in the database.
ORDER BY count DESC LIMIT %s""", (num,))
return list(s[0].split('@')[0] for s in cur)
-
def _save_config(self) -> None:
""" Save the configuration that needs to remain stable for the given
database as database properties.
with connect(self.dsn) as conn:
self.loader.save_config_to_db(conn)
-
def _setup_db_tables(self, config: Configuration) -> None:
""" Set up the word table and fill it with pre-computed word
frequencies.
""")
conn.commit()
-
def _create_base_indices(self, config: Configuration, table_name: str) -> None:
""" Set up the word table and fill it with pre-computed word
frequencies.
column_type=ctype)
conn.commit()
-
def _create_lookup_indices(self, config: Configuration, table_name: str) -> None:
""" Create additional indexes used when running the API.
"""
with connect(self.dsn) as conn:
sqlp = SQLPreprocessor(conn, config)
# Index required for details lookup.
- sqlp.run_string(conn, """
+ sqlp.run_string(
+ conn,
+ """
CREATE INDEX IF NOT EXISTS idx_{{table_name}}_word_id
ON {{table_name}} USING BTREE (word_id) {{db.tablespace.search_index}}
- """,
- table_name=table_name)
+ """,
+ table_name=table_name)
conn.commit()
-
def _move_temporary_word_table(self, old: str) -> None:
""" Rename all tables and indexes used by the tokenizer.
"""
conn.commit()
-
-
class ICUNameAnalyzer(AbstractAnalyzer):
""" The ICU analyzer uses the ICU library for splitting names.
self._cache = _TokenCache()
-
def close(self) -> None:
""" Free all resources used by the analyzer.
"""
self.conn.close()
self.conn = None
-
def _search_normalized(self, name: str) -> str:
""" Return the search token transliteration of the given name.
"""
return cast(str, self.token_analysis.search.transliterate(name)).strip()
-
def _normalized(self, name: str) -> str:
""" Return the normalized version of the given name with all
non-relevant information removed.
"""
return cast(str, self.token_analysis.normalizer.transliterate(name)).strip()
-
def get_word_token_info(self, words: Sequence[str]) -> List[Tuple[str, str, int]]:
""" Return token information for the given list of words.
If a word starts with # it is assumed to be a full name
part_ids = {r[0]: r[1] for r in cur}
return [(k, v, full_ids.get(v, None)) for k, v in full_tokens.items()] \
- + [(k, v, part_ids.get(v, None)) for k, v in partial_tokens.items()]
-
+ + [(k, v, part_ids.get(v, None)) for k, v in partial_tokens.items()]
def normalize_postcode(self, postcode: str) -> str:
""" Convert the postcode to a standardized form.
"""
return postcode.strip().upper()
-
def update_postcodes_from_db(self) -> None:
""" Update postcode tokens in the word table from the location_postcode
table.
with self.conn.cursor() as cur:
cur.executemany("""SELECT create_postcode_word(%s, %s)""", terms)
-
-
-
def update_special_phrases(self, phrases: Iterable[Tuple[str, str, str, str]],
should_replace: bool) -> None:
""" Replace the search index for special phrases with the new phrases.
LOG.info("Total phrases: %s. Added: %s. Deleted: %s",
len(norm_phrases), added, deleted)
-
def _add_special_phrases(self, cursor: Cursor,
new_phrases: Set[Tuple[str, str, str, str]],
existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
return added
-
def _remove_special_phrases(self, cursor: Cursor,
- new_phrases: Set[Tuple[str, str, str, str]],
- existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
+ new_phrases: Set[Tuple[str, str, str, str]],
+ existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
""" Remove all phrases from the database that are no longer in the
new phrase list.
"""
return len(to_delete)
-
def add_country_names(self, country_code: str, names: Mapping[str, str]) -> None:
""" Add default names for the given country to the search index.
"""
self.sanitizer.process_names(info)[0],
internal=True)
-
def _add_country_full_names(self, country_code: str, names: Sequence[PlaceName],
internal: bool = False) -> None:
""" Add names for the given country from an already sanitized
"""
cur.execute(sql, (country_code, list(new_tokens)))
-
def process_place(self, place: PlaceInfo) -> Mapping[str, Any]:
""" Determine tokenizer information about the given place.
return token_info.to_dict()
-
def _process_place_address(self, token_info: '_TokenInfo',
address: Sequence[PlaceName]) -> None:
for item in address:
elif item.kind == 'place':
if not item.suffix:
token_info.add_place(itertools.chain(*self._compute_name_tokens([item])))
- elif not item.kind.startswith('_') and not item.suffix and \
- item.kind not in ('country', 'full', 'inclusion'):
+ elif (not item.kind.startswith('_') and not item.suffix and
+ item.kind not in ('country', 'full', 'inclusion')):
token_info.add_address_term(item.kind,
itertools.chain(*self._compute_name_tokens([item])))
-
def _compute_housenumber_token(self, hnr: PlaceName) -> Tuple[Optional[int], Optional[str]]:
""" Normalize the housenumber and return the word token and the
canonical form.
return result
-
def _retrieve_full_tokens(self, name: str) -> List[int]:
""" Get the full name token for the given name, if it exists.
The name is only retrieved for the standard analyser.
return full
-
def _compute_name_tokens(self, names: Sequence[PlaceName]) -> Tuple[Set[int], Set[int]]:
""" Computes the full name and partial name tokens for the given
dictionary of names.
return full_tokens, partial_tokens
-
def _add_postcode(self, item: PlaceName) -> Optional[str]:
""" Make sure the normalized postcode is present in the word table.
"""
self.address_tokens: Dict[str, str] = {}
self.postcode: Optional[str] = None
-
def _mk_array(self, tokens: Iterable[Any]) -> str:
return f"{{{','.join((str(s) for s in tokens))}}}"
-
def to_dict(self) -> Dict[str, Any]:
""" Return the token information in database importable format.
"""
return out
-
def set_names(self, fulls: Iterable[int], partials: Iterable[int]) -> None:
""" Adds token information for the normalised names.
"""
self.names = self._mk_array(itertools.chain(fulls, partials))
-
def add_housenumber(self, token: Optional[int], hnr: Optional[str]) -> None:
""" Extract housenumber information from a list of normalised
housenumbers.
self.housenumbers.add(hnr)
self.housenumber_tokens.add(token)
-
def add_street(self, tokens: Iterable[int]) -> None:
""" Add addr:street match terms.
"""
self.street_tokens = set()
self.street_tokens.update(tokens)
-
def add_place(self, tokens: Iterable[int]) -> None:
""" Add addr:place search and match terms.
"""
self.place_tokens.update(tokens)
-
def add_address_term(self, key: str, partials: Iterable[int]) -> None:
""" Add additional address terms.
"""
self.handlers.append(module.create(SanitizerConfig(func)))
-
def process_names(self, place: PlaceInfo) -> Tuple[List[PlaceName], List[PlaceName]]:
""" Extract a sanitized list of names and address parts from the
given place. The function returns a tuple
self.names = self._convert_name_dict(place.name)
self.address = self._convert_name_dict(place.address)
-
@staticmethod
def _convert_name_dict(names: Optional[Mapping[str, str]]) -> List[PlaceName]:
""" Convert a dictionary of names into a list of PlaceNames.
from .base import ProcessInfo
from .config import SanitizerConfig
+
class _HousenumberSanitizer:
def __init__(self, config: SanitizerConfig) -> None:
self.filter_name = config.get_filter('convert-to-name', 'FAIL_ALL')
-
def __call__(self, obj: ProcessInfo) -> None:
if not obj.address:
return
obj.address = new_address
-
def sanitize(self, value: str) -> Iterator[str]:
""" Extract housenumbers in a regularized format from an OSM value.
if hnr:
yield from self._regularize(hnr)
-
def _regularize(self, hnr: str) -> Iterator[str]:
yield hnr
from .base import ProcessInfo
from .config import SanitizerConfig
+
class _PostcodeSanitizer:
def __init__(self, config: SanitizerConfig) -> None:
if default_pattern is not None and isinstance(default_pattern, str):
self.matcher.set_default_pattern(default_pattern)
-
def __call__(self, obj: ProcessInfo) -> None:
if not obj.address:
return
postcode.name = formatted[0]
postcode.set_attr('variant', formatted[1])
-
def scan(self, postcode: str, country: Optional[str]) -> Optional[Tuple[str, str]]:
""" Check the postcode for correct formatting and return the
normalized version. Returns None if the postcode does not
assert country is not None
- return self.matcher.normalize(country, match),\
- ' '.join(filter(lambda p: p is not None, match.groups()))
-
-
+ return self.matcher.normalize(country, match), \
+ ' '.join(filter(lambda p: p is not None, match.groups()))
def create(config: SanitizerConfig) -> Callable[[ProcessInfo], None]:
COUNTY_MATCH = re.compile('(.*), [A-Z][A-Z]')
+
def _clean_tiger_county(obj: ProcessInfo) -> None:
""" Remove the state reference from tiger:county tags.
else:
_BaseUserDict = UserDict
+
class SanitizerConfig(_BaseUserDict):
""" The `SanitizerConfig` class is a read-only dictionary
with configuration options for the sanitizer.
return values
-
def get_bool(self, param: str, default: Optional[bool] = None) -> bool:
""" Extract a configuration parameter as a boolean.
return value
-
def get_delimiter(self, default: str = ',;') -> Pattern[str]:
""" Return the 'delimiters' parameter in the configuration as a
compiled regular expression that can be used to split strings on
return re.compile('\\s*[{}]+\\s*'.format(''.join('\\' + d for d in delimiter_set)))
-
def get_filter(self, param: str, default: Union[str, Sequence[str]] = 'PASS_ALL'
) -> Callable[[str], bool]:
""" Returns a filter function for the given parameter of the sanitizer
from .base import ProcessInfo\r
from .config import SanitizerConfig\r
\r
+\r
class _TagSanitizer:\r
\r
def __init__(self, config: SanitizerConfig) -> None:\r
\r
self.has_country_code = config.get('country_code', None) is not None\r
\r
-\r
def __call__(self, obj: ProcessInfo) -> None:\r
tags = obj.names if self.type == 'name' else obj.address\r
\r
or not self.filter_name(tag.name):\r
filtered_tags.append(tag)\r
\r
-\r
if self.type == 'name':\r
obj.names = filtered_tags\r
else:\r
obj.address = filtered_tags\r
\r
-\r
def _set_allowed_ranks(self, ranks: Sequence[str]) -> Tuple[bool, ...]:\r
""" Returns a tuple of 31 boolean values corresponding to the\r
address ranks 0-30. Value at index 'i' is True if rank 'i'\r
for i in range(start, end + 1):\r
allowed_ranks[i] = True\r
\r
-\r
return tuple(allowed_ranks)\r
\r
\r
from .base import ProcessInfo
from .config import SanitizerConfig
+
def create(config: SanitizerConfig) -> Callable[[ProcessInfo], None]:
""" Create a name processing function that splits name values with
multiple values into their components.
from .base import ProcessInfo
from .config import SanitizerConfig
+
class _AnalyzerByLanguage:
""" Processor for tagging the language of names in a place.
"""
self._compute_default_languages(config.get('use-defaults', 'no'))
-
def _compute_default_languages(self, use_defaults: str) -> None:
self.deflangs: Dict[Optional[str], List[str]] = {}
for ccode, clangs in country_info.iterate('languages'):
if len(clangs) == 1 or use_defaults == 'all':
if self.whitelist:
- self.deflangs[ccode] = [l for l in clangs if l in self.whitelist]
+ self.deflangs[ccode] = [cl for cl in clangs if cl in self.whitelist]
else:
self.deflangs[ccode] = clangs
-
def _suffix_matches(self, suffix: str) -> bool:
if self.whitelist is None:
return len(suffix) in (2, 3) and suffix.islower()
return suffix in self.whitelist
-
def __call__(self, obj: ProcessInfo) -> None:
if not obj.names:
return
else:
langs = self.deflangs.get(obj.place.country_code)
-
if langs:
if self.replace:
name.set_attr('analyzer', langs[0])
else:
more_names.append(name.clone(attr={'analyzer': langs[0]}))
- more_names.extend(name.clone(attr={'analyzer': l}) for l in langs[1:])
+ more_names.extend(name.clone(attr={'analyzer': lg}) for lg in langs[1:])
obj.names.extend(more_names)
from .config import SanitizerConfig
from ...data.place_name import PlaceName
+
def create(_: SanitizerConfig) -> Callable[[ProcessInfo], None]:
"""Set up the sanitizer
"""
return tag_japanese
+
def reconbine_housenumber(
new_address: List[PlaceName],
tmp_housenumber: Optional[str],
)
return new_address
+
def reconbine_place(
new_address: List[PlaceName],
tmp_neighbourhood: Optional[str],
)
)
return new_address
+
+
def tag_japanese(obj: ProcessInfo) -> None:
"""Recombine kind of address
"""
from ...typing import Protocol
from ...data.place_name import PlaceName
+
class Analyzer(Protocol):
""" The `create()` function of an analysis module needs to return an
object that implements the following functions.
from ...config import flatten_config_list
from ...errors import UsageError
+
class ICUVariant(NamedTuple):
""" A single replacement rule for variant creation.
"""
def __init__(self, normalizer: Any) -> None:
self.norm = normalizer
-
def compute(self, rule: Any) -> Iterator[ICUVariant]:
""" Generator for all ICUVariant tuples from a single variant rule.
"""
for froms, tos in _create_variants(*src, repl, decompose):
yield ICUVariant(froms, tos)
-
def _parse_variant_word(self, name: str) -> Optional[Tuple[str, str, str]]:
name = name.strip()
match = re.fullmatch(r'([~^]?)([^~$^]*)([~$]?)', name)
from .config_variants import get_variant_config
from .generic_mutation import MutationVariantGenerator
-### Configuration section
+# Configuration section
+
def configure(rules: Mapping[str, Any], normalizer: Any, _: Any) -> Dict[str, Any]:
""" Extract and preprocess the configuration for this module.
return config
-### Analysis section
+# Analysis section
def create(normalizer: Any, transliterator: Any,
config: Mapping[str, Any]) -> 'GenericTokenAnalysis':
# set up mutation rules
self.mutations = [MutationVariantGenerator(*cfg) for cfg in config['mutations']]
-
def get_canonical_id(self, name: PlaceName) -> str:
""" Return the normalized form of the name. This is the standard form
from which possible variants for the name can be derived.
"""
return cast(str, self.norm.transliterate(name.name)).strip()
-
def compute_variants(self, norm_name: str) -> List[str]:
""" Compute the spelling variants for the given normalized name
and transliterate the result.
return [name for name in self._transliterate_unique_list(norm_name, variants) if name]
-
def _transliterate_unique_list(self, norm_name: str,
iterable: Iterable[str]) -> Iterator[Optional[str]]:
seen = set()
seen.add(variant)
yield self.to_ascii.transliterate(variant).strip()
-
def _generate_word_variants(self, norm_name: str) -> Iterable[str]:
baseform = '^ ' + norm_name + ' ^'
baselen = len(baseform)
LOG = logging.getLogger()
+
def _zigzag(outer: Iterable[str], inner: Iterable[str]) -> Iterator[str]:
return itertools.chain.from_iterable(itertools.zip_longest(outer, inner, fillvalue=''))
"This is not allowed.", pattern)
raise UsageError("Bad mutation pattern in configuration.")
-
def generate(self, names: Iterable[str]) -> Iterator[str]:
""" Generator function for the name variants. 'names' is an iterable
over a set of names for which the variants are to be generated.
for seps in self._fillers(len(parts)):
yield ''.join(_zigzag(parts, seps))
-
def _fillers(self, num_parts: int) -> Iterator[Tuple[str, ...]]:
""" Returns a generator for strings to join the given number of string
parts in all possible combinations.
RE_ALPHA_DIGIT = re.compile(r'([^\s\d␣])\s*(\d)')
RE_NAMED_PART = re.compile(r'[a-z]{4}')
-### Configuration section
+# Configuration section
+
def configure(*_: Any) -> None:
""" All behaviour is currently hard-coded.
"""
return None
-### Analysis section
+# Analysis section
+
-def create(normalizer: Any, transliterator: Any, config: None) -> 'HousenumberTokenAnalysis': # pylint: disable=W0613
+def create(normalizer: Any, transliterator: Any, config: None) -> 'HousenumberTokenAnalysis':
""" Create a new token analysis instance for this module.
"""
return HousenumberTokenAnalysis(normalizer, transliterator)
#
# This file is part of Nominatim. (https://nominatim.org)
#
-# Copyright (C) 2022 by the Nominatim developer community.
+# Copyright (C) 2024 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Specialized processor for postcodes. Supports a 'lookup' variant of the
from ...data.place_name import PlaceName
from .generic_mutation import MutationVariantGenerator
-### Configuration section
+# Configuration section
+
def configure(*_: Any) -> None:
""" All behaviour is currently hard-coded.
"""
return None
-### Analysis section
+# Analysis section
+
-def create(normalizer: Any, transliterator: Any, config: None) -> 'PostcodeTokenAnalysis': # pylint: disable=W0613
+def create(normalizer: Any, transliterator: Any, config: None) -> 'PostcodeTokenAnalysis':
""" Create a new token analysis instance for this module.
"""
return PostcodeTokenAnalysis(normalizer, transliterator)
self.mutator = MutationVariantGenerator(' ', (' ', ''))
-
def get_canonical_id(self, name: PlaceName) -> str:
""" Return the standard form of the postcode.
"""
return name.name.strip().upper()
-
def compute_variants(self, norm_name: str) -> List[str]:
""" Compute the spelling variants for the given normalized postcode.
LOG = logging.getLogger()
+
def _run_osm2pgsql(dsn: str, options: MutableMapping[str, Any]) -> None:
run_osm2pgsql(options)
LOG = logging.getLogger()
+
def _get_place_info(cursor: Cursor, osm_id: Optional[str],
place_id: Optional[int]) -> DictCursorResult:
sql = """SELECT place_id, extra.*
from textwrap import dedent
from ..config import Configuration
-from ..db.connection import connect, Connection, server_version_tuple,\
+from ..db.connection import connect, Connection, server_version_tuple, \
index_exists, table_exists, execute_scalar
from ..db import properties
from ..errors import UsageError
CHECKLIST = []
+
class CheckState(Enum):
""" Possible states of a check. FATAL stops check execution entirely.
"""
NOT_APPLICABLE = 3
WARN = 4
+
CheckResult = Union[CheckState, Tuple[CheckState, Mapping[str, Any]]]
CheckFunc = Callable[[Connection, Configuration], CheckResult]
+
def _check(hint: Optional[str] = None) -> Callable[[CheckFunc], CheckFunc]:
""" Decorator for checks. It adds the function to the list of
checks to execute and adds the code for printing progress messages.
return decorator
+
class _BadConnection:
def __init__(self, msg: str) -> None:
""" Dummy function to provide the implementation.
"""
+
def check_database(config: Configuration) -> int:
""" Run a number of checks on the database and return the status.
"""
try:
conn = connect(config.get_libpq_dsn())
except UsageError as err:
- conn = _BadConnection(str(err)) # type: ignore[assignment]
+ conn = _BadConnection(str(err)) # type: ignore[assignment]
overall_result = 0
for check in CHECKLIST:
'idx_osmline_parent_osm_id',
'idx_postcode_id',
'idx_postcode_postcode'
- ]
+ ]
# These won't exist if --reverse-only import was used
if table_exists(conn, 'search_name'):
return CheckState.OK
+
@_check(hint="""\
Database version ({db_version}) doesn't match Nominatim version ({nom_version})
instruction=instruction,
config=config)
+
@_check(hint="""\
placex table not found
return CheckState.OK
if freeze.is_frozen(conn):
- index_cmd="""\
+ index_cmd = """\
Database is marked frozen, it cannot be updated.
Low counts of unindexed places are fine."""
return CheckState.WARN, dict(count=cnt, index_cmd=index_cmd)
- PostgreSQL version: {postgresql_ver}
- PostGIS version: {postgis_ver}
- OS: {os_name_info()}
-
-
+
+
**Hardware Configuration:**
- RAM: {friendly_memory_string(psutil.virtual_memory().total)}
- number of CPUs: {psutil.cpu_count(logical=False)}
```
{run_command(["df", "-h"])}
```
-
+
**lsblk - list block devices: **
```
{run_command("lsblk")}
```
-
-
+
+
**Postgresql Configuration:**
```
{postgresql_config}
LOG = logging.getLogger()
+
async def convert(project_dir: Optional[Union[str, Path]],
outfile: Path, options: Set[str]) -> None:
""" Export an existing database to sqlite. The resulting database
self.dest = dest
self.options = options
-
async def write(self) -> None:
""" Create the database structure and copy the data from
the source database to the destination.
await self.create_word_table()
await self.create_indexes()
-
async def create_tables(self) -> None:
""" Set up the database tables.
"""
sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
col.type.subtype.upper(), 'XY')))
-
async def create_class_tables(self) -> None:
""" Set up the table that serve class/type-specific geometries.
"""
sa.Column('place_id', sa.BigInteger),
sa.Column('centroid', Geometry))
-
async def create_word_table(self) -> None:
""" Create the word table.
This table needs the property information to determine the
await self.dest.connection.run_sync(sa.Index('idx_word_woken', dest.c.word_token).create)
-
async def copy_data(self) -> None:
""" Copy data for all registered tables.
"""
data = [{'tablename': t} for t in self.dest.t.meta.tables]
await self.dest.execute(pg_tables.insert().values(data))
-
async def create_indexes(self) -> None:
""" Add indexes necessary for the frontend.
"""
await self.dest.execute(sa.select(
sa.func.CreateSpatialIndex(t, 'centroid')))
-
async def create_spatial_index(self, table: str, column: str) -> None:
""" Create a spatial index on the given table and column.
"""
await self.dest.execute(sa.select(
sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
-
async def create_index(self, table_name: str, column: str) -> None:
""" Create a simple index on the given table and column.
"""
await self.dest.connection.run_sync(
sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
-
async def create_search_index(self) -> None:
""" Create the tables and indexes needed for word lookup.
"""
await self.dest.connection.run_sync(
sa.Index('idx_reverse_search_name_word', rsn.c.word).create)
-
def select_from(self, table: str) -> SaSelect:
""" Create the SQL statement to select the source columns and rows.
"""
columns.geometry),
else_=sa.func.ST_SimplifyPreserveTopology(
columns.geometry, 0.0001)
- )).label('geometry'))
+ )).label('geometry'))
sql = sa.select(*(sa.func.ST_AsText(c).label(c.name)
- if isinstance(c.type, Geometry) else c for c in columns))
+ if isinstance(c.type, Geometry) else c for c in columns))
return sql
from ..errors import UsageError
from ..config import Configuration
-from ..db.connection import connect, get_pg_env, Connection, server_version_tuple,\
+from ..db.connection import connect, get_pg_env, Connection, server_version_tuple, \
postgis_version_tuple, drop_tables, table_exists, execute_scalar
from ..db.sql_preprocessor import SQLPreprocessor
from ..db.query_pool import QueryPool
LOG = logging.getLogger()
+
def _require_version(module: str, actual: Tuple[int, int], expected: Tuple[int, int]) -> None:
""" Compares the version for the given module and raises an exception
if the actual version is too old.
async def create_search_indices(conn: Connection, config: Configuration,
- drop: bool = False, threads: int = 1) -> None:
+ drop: bool = False, threads: int = 1) -> None:
""" Create tables that have explicit partitioning.
"""
'--number-processes', '1' if options['append'] else str(options['threads']),
'--cache', str(options['osm2pgsql_cache']),
'--style', str(options['osm2pgsql_style'])
- ]
+ ]
if str(options['osm2pgsql_style']).endswith('.lua'):
env['LUA_PATH'] = ';'.join((str(options['osm2pgsql_style_path'] / '?.lua'),
cmd.extend(('--output', 'gazetteer', '--hstore', '--latlon'))
cmd.extend(_mk_tablespace_options('main', options))
-
if options['flatnode_file']:
cmd.extend(('--flat-nodes', options['flatnode_file']))
'wikipedia_%'
]
+
def drop_update_tables(conn: Connection) -> None:
""" Drop all tables only necessary for updating the database from
OSM replication data.
if fpath and fpath.exists():
fpath.unlink()
+
def is_frozen(conn: Connection) -> bool:
""" Returns true if database is in a frozen state
"""
-
return table_exists(conn, 'place') is False
from ..errors import UsageError
from ..config import Configuration
from ..db import properties
-from ..db.connection import connect, Connection,\
+from ..db.connection import connect, Connection, \
table_exists, register_hstore
from ..version import NominatimVersion, NOMINATIM_VERSION, parse_version
from ..tokenizer import factory as tokenizer_factory
LOG = logging.getLogger()
-_MIGRATION_FUNCTIONS : List[Tuple[NominatimVersion, Callable[..., None]]] = []
+_MIGRATION_FUNCTIONS: List[Tuple[NominatimVersion, Callable[..., None]]] = []
+
def migrate(config: Configuration, paths: Any) -> int:
""" Check for the current database version and execute migrations,
LOG = logging.getLogger()
+
def _to_float(numstr: str, max_value: float) -> float:
""" Convert the number in string into a float. The number is expected
to be in the range of [-max_value, max_value]. Otherwise rises a
return num
+
class _PostcodeCollector:
""" Collector for postcodes of a single country.
"""
self.collected: Dict[str, PointsCentroid] = defaultdict(PointsCentroid)
self.normalization_cache: Optional[Tuple[str, Optional[str]]] = None
-
def add(self, postcode: str, x: float, y: float) -> None:
""" Add the given postcode to the collection cache. If the postcode
already existed, it is overwritten with the new centroid.
if normalized:
self.collected[normalized] += (x, y)
-
def commit(self, conn: Connection, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
""" Update postcodes for the country from the postcodes selected so far
as well as any externally supplied postcodes.
""").format(pysql.Literal(self.country)),
to_update)
-
- def _compute_changes(self, conn: Connection) \
- -> Tuple[List[Tuple[str, float, float]], List[str], List[Tuple[float, float, str]]]:
+ def _compute_changes(
+ self, conn: Connection
+ ) -> Tuple[List[Tuple[str, float, float]], List[str], List[Tuple[float, float, str]]]:
""" Compute which postcodes from the collected postcodes have to be
added or modified and which from the location_postcode table
have to be deleted.
return to_add, to_delete, to_update
-
def _update_from_external(self, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
""" Look for an external postcode file for the active country in
the project directory and add missing postcodes when found.
finally:
csvfile.close()
-
def _open_external(self, project_dir: Path) -> Optional[TextIO]:
fname = project_dir / f'{self.country}_postcodes.csv'
analyzer.update_postcodes_from_db()
+
def can_compute(dsn: str) -> bool:
"""
Check that the place table exists so that
from psycopg import sql as pysql
from ..config import Configuration
-from ..db.connection import Connection, connect, postgis_version_tuple,\
+from ..db.connection import Connection, connect, postgis_version_tuple, \
drop_tables
from ..db.utils import execute_file
from ..db.sql_preprocessor import SQLPreprocessor
OSM_TYPE = {'N': 'node', 'W': 'way', 'R': 'relation'}
+
def _add_address_level_rows_from_entry(rows: MutableSequence[Tuple[Any, ...]],
entry: Mapping[str, Any]) -> None:
""" Converts a single entry from the JSON format for address rank
The table has the following columns:
country, class, type, rank_search, rank_address
"""
- rows: List[Tuple[Any, ...]] = []
+ rows: List[Tuple[Any, ...]] = []
for entry in levels:
_add_address_level_rows_from_entry(rows, entry)
return 0
+
def recompute_importance(conn: Connection) -> None:
""" Recompute wikipedia links and importance for all entries in placex.
This is a long-running operations that must not be executed in
LOG = logging.getLogger()
+
def init_replication(conn: Connection, base_url: str,
socket_timeout: int = 60) -> None:
""" Set up replication for the server at the given base URL.
LOG.warning("New data available (%i => %i).", seq, state.sequence)
return 0
+
class UpdateState(Enum):
""" Possible states after an update has run.
"""
""" Download a resource from the given URL and return a byte sequence
of the content.
"""
- headers = {"User-Agent" : f"Nominatim (pyosmium/{pyo_version.pyosmium_release})"}
+ headers = {"User-Agent": f"Nominatim (pyosmium/{pyo_version.pyosmium_release})"}
if self.session is not None:
return self.session.get(url.get_full_url(),
- headers=headers, timeout=timeout or None,
- stream=True)
+ headers=headers, timeout=timeout or None,
+ stream=True)
@contextmanager
def _get_url_with_session() -> Iterator[requests.Response]:
import logging
LOG = logging.getLogger()
+
class SpecialPhrasesImporterStatistics():
"""
Class handling statistics of the import
from ...errors import UsageError
from .special_phrase import SpecialPhrase
+
class SPCsvLoader:
"""
Handles loading of special phrases from external csv file.
def __init__(self, csv_path: str) -> None:
self.csv_path = csv_path
-
def generate_phrases(self) -> Iterable[SpecialPhrase]:
""" Open and parse the given csv file.
Create the corresponding SpecialPhrases.
for row in reader:
yield SpecialPhrase(row['phrase'], row['class'], row['type'], row['operator'])
-
def _check_csv_validity(self) -> None:
"""
Check that the csv file has the right extension.
LOG = logging.getLogger()
+
def _classtype_table(phrase_class: str, phrase_type: str) -> str:
""" Return the name of the table for the given class and type.
"""
LOG.warning('Import done.')
self.statistics_handler.notify_import_done()
-
def _fetch_existing_place_classtype_tables(self) -> None:
"""
Fetch existing place_classtype tables.
self.table_phrases_to_delete.add(row[0])
def _load_white_and_black_lists(self) \
- -> Tuple[Mapping[str, Sequence[str]], Mapping[str, Sequence[str]]]:
+ -> Tuple[Mapping[str, Sequence[str]], Mapping[str, Sequence[str]]]:
"""
Load white and black lists from phrases-settings.json.
"""
return (phrase.p_class, phrase.p_type)
-
def _create_classtype_table_and_indexes(self,
class_type_pairs: Iterable[Tuple[str, str]]) -> None:
"""
with self.db_connection.cursor() as db_cursor:
db_cursor.execute("DROP INDEX idx_placex_classtype")
-
def _create_place_classtype_table(self, sql_tablespace: str,
phrase_class: str, phrase_type: str) -> None:
"""
""").format(Identifier(table_name), SQL(sql_tablespace)),
(phrase_class, phrase_type))
-
def _create_place_classtype_indexes(self, sql_tablespace: str,
phrase_class: str, phrase_type: str) -> None:
"""
Identifier(base_table),
SQL(sql_tablespace)))
-
def _grant_access_to_webuser(self, phrase_class: str, phrase_type: str) -> None:
"""
Grant access on read to the table place_classtype for the webuser.
.format(Identifier(table_name),
Identifier(self.config.DATABASE_WEBUSER)))
-
def _remove_non_existent_tables_from_db(self) -> None:
"""
Remove special phrases which doesn't exist on the wiki anymore.
LOG = logging.getLogger()
+
def _get_wiki_content(lang: str) -> str:
"""
Request and return the wiki page's content
self.type_fix_pattern = re.compile(r'\"|"')
self.languages = self.config.get_str_list('LANGUAGES') or \
- ['af', 'ar', 'br', 'ca', 'cs', 'de', 'en', 'es',
- 'et', 'eu', 'fa', 'fi', 'fr', 'gl', 'hr', 'hu',
- 'ia', 'is', 'it', 'ja', 'mk', 'nl', 'no', 'pl',
- 'ps', 'pt', 'ru', 'sk', 'sl', 'sv', 'uk', 'vi',
- 'lv', 'tr']
-
+ ['af', 'ar', 'br', 'ca', 'cs', 'de', 'en', 'es',
+ 'et', 'eu', 'fa', 'fi', 'fr', 'gl', 'hr', 'hu',
+ 'ia', 'is', 'it', 'ja', 'mk', 'nl', 'no', 'pl',
+ 'ps', 'pt', 'ru', 'sk', 'sl', 'sv', 'uk', 'vi',
+ 'lv', 'tr']
def generate_phrases(self) -> Iterable[SpecialPhrase]:
""" Download the wiki pages for the configured languages
"""
from typing import Any
+
class SpecialPhrase:
"""
Model representing a special phrase.
return False
return self.p_label == other.p_label \
- and self.p_class == other.p_class \
- and self.p_type == other.p_type \
- and self.p_operator == other.p_operator
+ and self.p_class == other.p_class \
+ and self.p_type == other.p_type \
+ and self.p_operator == other.p_operator
def __hash__(self) -> int:
return hash((self.p_label, self.p_class, self.p_type, self.p_operator))
LOG = logging.getLogger()
+
class TigerInput:
""" Context manager that goes through Tiger input files which may
either be in a directory or gzipped together in a tar file.
if data_dir.endswith('.tar.gz'):
try:
- self.tar_handle = tarfile.open(data_dir) # pylint: disable=consider-using-with
+ self.tar_handle = tarfile.open(data_dir)
except tarfile.ReadError as err:
LOG.fatal("Cannot open '%s'. Is this a tar file?", data_dir)
raise UsageError("Cannot open Tiger data file.") from err
if not self.files:
LOG.warning("Tiger data import selected but no files found at %s", data_dir)
-
def __enter__(self) -> 'TigerInput':
return self
-
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
if self.tar_handle:
self.tar_handle.close()
return open(cast(str, fname), encoding='utf-8')
-
def __iter__(self) -> Iterator[Dict[str, Any]]:
""" Iterate over the lines in each file.
"""
async def add_tiger_data(data_dir: str, config: Configuration, threads: int,
- tokenizer: AbstractTokenizer) -> int:
+ tokenizer: AbstractTokenizer) -> int:
""" Import tiger data from directory or tar file `data dir`.
"""
dsn = config.get_libpq_dsn()
"""
from typing import Any, Union, Mapping, TypeVar, Sequence, TYPE_CHECKING
-# Generics variable names do not confirm to naming styles, ignore globally here.
-# pylint: disable=invalid-name,abstract-method,multiple-statements
-# pylint: disable=missing-class-docstring,useless-import-alias
if TYPE_CHECKING:
import os
T_ResultKey = TypeVar('T_ResultKey', int, str)
+
class DictCursorResult(Mapping[str, Any]):
def __getitem__(self, x: Union[int, str]) -> Any: ...
+
DictCursorResults = Sequence[DictCursorResult]
# The following typing features require typing_extensions to work
from typing import Tuple, Any
from collections.abc import Collection
+
class PointsCentroid:
""" Centroid computation from single points using an online algorithm.
More points may be added at any time.
return (float(self.sum_x/self.count)/10000000,
float(self.sum_y/self.count)/10000000)
-
def __len__(self) -> int:
return self.count
-
def __iadd__(self, other: Any) -> 'PointsCentroid':
if isinstance(other, Collection) and len(other) == 2:
if all(isinstance(p, (float, int)) for p in other):
"""
Helper functions for accessing URL.
"""
-from typing import IO
+from typing import IO # noqa
import logging
import urllib.request as urlrequest
LOG = logging.getLogger()
+
def get_url(url: str) -> str:
""" Get the contents from the given URL and return it as a UTF-8 string.
try:
request = urlrequest.Request(url, headers=headers)
- with urlrequest.urlopen(request) as response: # type: IO[bytes]
+ with urlrequest.urlopen(request) as response: # type: IO[bytes]
return response.read().decode('utf-8')
except Exception:
LOG.fatal('Failed to load URL: %s', url)
"""
from typing import NamedTuple, Optional
-# See also https://github.com/PyCQA/pylint/issues/6006
-# pylint: disable=useless-import-alias,unused-import
class NominatimVersion(NamedTuple):
""" Version information for Nominatim. We follow semantic versioning.
return f"{self.major}.{self.minor}.{self.patch_level}"
-
def parse_version(version: str) -> NominatimVersion:
""" Parse a version string into a version consisting of a tuple of
four ints: major, minor, patch level, database patch level
# on every execution of 'make'.
# cmake/tool-installed.tmpl is used to build the binary 'nominatim'. Inside
# there is a call to set the variable value below.
-GIT_COMMIT_HASH : Optional[str] = None
+GIT_COMMIT_HASH: Optional[str] = None