| 4.3.x | 2025-09-07 |
| 4.2.x | 2024-11-24 |
| 4.1.x | 2024-08-05 |
-| 4.0.x | 2023-11-02 |
## Reporting a Vulnerability
## List of Previous Incidents
-* 2020-05-04 - [SQL injection issue on /details endpoint](https://lists.openstreetmap.org/pipermail/geocoding/2020-May/002012.html)
+* 2023-11-20 - [SQL injection vulnerability](https://nominatim.org/2023/11/20/release-432.html)
* 2023-02-21 - [cross-site scripting vulnerability](https://nominatim.org/2023/02/21/release-421.html)
+* 2020-05-04 - [SQL injection issue on /details endpoint](https://lists.openstreetmap.org/pipermail/geocoding/2020-May/002012.html)
The library also misses a proper installation routine, so some manipulation
of the PYTHONPATH is required. At the moment, use is only recommended for
- developers wit some experience in Python.
+ developers with some experience in Python.
## Installation
-- If the place had a postcode assigned, take this one only
-- into consideration when it is an area and the place does not have
-- a postcode itself.
- IF location.fromarea AND location.isaddress
+ IF location.fromarea AND location_isaddress
AND (place.address is null or not place.address ? 'postcode')
THEN
place.postcode := null; -- remove the less exact postcode
self.buffer = io.StringIO()
+ def _timestamp(self) -> None:
+ self._write(f'[{dt.datetime.now()}]\n')
+
+
def get_buffer(self) -> str:
return self.buffer.getvalue()
def section(self, heading: str) -> None:
+ self._timestamp()
self._write(f"\n# {heading}\n\n")
def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
+ self._timestamp()
self._write(f'{heading}:\n')
total = 0
for rank, res in results:
def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
+ self._timestamp()
sqlstr = '\n| '.join(textwrap.wrap(self.format_sql(conn, statement, params), width=78))
self._write(f"| {sqlstr}\n\n")
#pylint: disable=singleton-comparison,not-callable
#pylint: disable=too-many-branches,too-many-arguments,too-many-locals,too-many-statements
+def no_index(expr: SaColumn) -> SaColumn:
+ """ Wrap the given expression, so that the query planner will
+ refrain from using the expression for index lookup.
+ """
+ return sa.func.coalesce(sa.null(), expr) # pylint: disable=not-callable
+
+
def _details_to_bind_params(details: SearchDetails) -> Dict[str, Any]:
""" Create a dictionary from search parameters that can be used
as bind parameter for SQL execute.
def _filter_by_layer(table: SaFromClause, layers: DataLayer) -> SaColumn:
orexpr: List[SaExpression] = []
if layers & DataLayer.ADDRESS and layers & DataLayer.POI:
- orexpr.append(table.c.rank_address.between(1, 30))
+ orexpr.append(no_index(table.c.rank_address).between(1, 30))
elif layers & DataLayer.ADDRESS:
- orexpr.append(table.c.rank_address.between(1, 29))
- orexpr.append(sa.and_(table.c.rank_address == 30,
+ orexpr.append(no_index(table.c.rank_address).between(1, 29))
+ orexpr.append(sa.and_(no_index(table.c.rank_address) == 30,
sa.or_(table.c.housenumber != None,
table.c.address.has_key('addr:housename'))))
elif layers & DataLayer.POI:
- orexpr.append(sa.and_(table.c.rank_address == 30,
+ orexpr.append(sa.and_(no_index(table.c.rank_address) == 30,
table.c.class_.not_in(('place', 'building'))))
if layers & DataLayer.MANMADE:
if not layers & DataLayer.NATURAL:
exclude.extend(('natural', 'water', 'waterway'))
orexpr.append(sa.and_(table.c.class_.not_in(tuple(exclude)),
- table.c.rank_address == 0))
+ no_index(table.c.rank_address) == 0))
else:
include = []
if layers & DataLayer.RAILWAY:
if layers & DataLayer.NATURAL:
include.extend(('natural', 'water', 'waterway'))
orexpr.append(sa.and_(table.c.class_.in_(tuple(include)),
- table.c.rank_address == 0))
+ no_index(table.c.rank_address) == 0))
if len(orexpr) == 1:
return orexpr[0]
else_ = tgeom.c.centroid.ST_Expand(0.05))))\
.order_by(tgeom.c.centroid.ST_Distance(table.c.centroid))
- sql = sql.where(t.c.rank_address.between(MIN_RANK_PARAM, MAX_RANK_PARAM))
+ sql = sql.where(no_index(t.c.rank_address).between(MIN_RANK_PARAM, MAX_RANK_PARAM))
if details.countries:
sql = sql.where(t.c.country_code.in_(COUNTRIES_PARAM))
if details.excluded:
penalty = 0.0
if row.type == 'w':
penalty = 0.3
+ elif row.type == 'W':
+ if len(row.word_token) == 1 and row.word_token == row.word:
+ penalty = 0.2 if row.word.isdigit() else 0.3
elif row.type == 'H':
penalty = sum(0.1 for c in row.word_token if c != ' ' and not c.isdigit())
if all(not c.isdigit() for c in row.word_token):
penalty += 0.2 * (len(row.word_token) - 1)
+ elif row.type == 'C':
+ if len(row.word_token) == 1:
+ penalty = 0.3
if row.info is None:
lookup_word = row.word
or (self.bounded_viewbox
and self.viewbox is not None and self.near is not None
and self.viewbox.contains(self.near))
- or self.layers is not None and not self.layers)
+ or (self.layers is not None and not self.layers)
+ or (self.max_rank <= 4 and
+ self.layers is not None and not self.layers & DataLayer.ADDRESS))
def layer_enabled(self, layer: DataLayer) -> bool:
""" Query execution that logs the SQL query when debugging is enabled.
"""
if LOG.isEnabledFor(logging.DEBUG):
- LOG.debug(self.mogrify(query, args).decode('utf-8')) # type: ignore[arg-type]
+ LOG.debug(self.mogrify(query, args).decode('utf-8'))
super().execute(query, args)
"""
if self.buffer.tell() > 0:
self.buffer.seek(0)
- cur.copy_from(self.buffer, table, columns=columns) # type: ignore[arg-type]
+ cur.copy_from(self.buffer, table, columns=columns)
import subprocess
import sys
from pathlib import Path
-from typing import List, Optional, Tuple, Union, cast
+from typing import List, Optional, Tuple, Union
import psutil
from psycopg2.extensions import make_dsn, parse_dsn
from nominatim.config import Configuration
from nominatim.db.connection import connect
-from nominatim.typing import DictCursorResults
from nominatim.version import NOMINATIM_VERSION
postgresql_ver: str = convert_version(conn.server_version_tuple())
with conn.cursor() as cur:
- cur.execute(f"""
- SELECT datname FROM pg_catalog.pg_database
- WHERE datname='{parse_dsn(config.get_libpq_dsn())['dbname']}'""")
- nominatim_db_exists = cast(Optional[DictCursorResults], cur.fetchall())
- if nominatim_db_exists:
- with connect(config.get_libpq_dsn()) as conn:
- postgis_ver: str = convert_version(conn.postgis_version_tuple())
- else:
- postgis_ver = "Unable to connect to database"
+ num = cur.scalar("SELECT count(*) FROM pg_catalog.pg_database WHERE datname=%s",
+ (parse_dsn(config.get_libpq_dsn())['dbname'], ))
+ nominatim_db_exists = num == 1 if isinstance(num, int) else False
+
+ if nominatim_db_exists:
+ with connect(config.get_libpq_dsn()) as conn:
+ postgis_ver: str = convert_version(conn.postgis_version_tuple())
+ else:
+ postgis_ver = "Unable to connect to database"
postgresql_config: str = get_postgresql_config(int(float(postgresql_ver)))