From: Sarah Hoffmann Date: Tue, 26 Jan 2021 21:45:24 +0000 (+0100) Subject: port replication initialisation to Python X-Git-Tag: v3.7.0~46^2~8 X-Git-Url: https://git.openstreetmap.org./nominatim.git/commitdiff_plain/d78f0ba80470a33a7a76edfe3ace5108684873cd port replication initialisation to Python --- diff --git a/lib/admin/update.php b/lib/admin/update.php index a0fbbc46..04eb7019 100644 --- a/lib/admin/update.php +++ b/lib/admin/update.php @@ -116,64 +116,13 @@ $sBaseURL = getSetting('REPLICATION_URL'); if ($aResult['init-updates']) { - // sanity check that the replication URL is correct - $sBaseState = file_get_contents($sBaseURL.'/state.txt'); - if ($sBaseState === false) { - echo "\nCannot find state.txt file at the configured replication URL.\n"; - echo "Does the URL point to a directory containing OSM update data?\n\n"; - fail('replication URL not reachable.'); - } - // sanity check for pyosmium-get-changes - if (!$sPyosmiumBin) { - echo "\nNOMINATIM_PYOSMIUM_BINARY not configured.\n"; - echo "You need to install pyosmium and set up the path to pyosmium-get-changes\n"; - echo "in your local .env file.\n\n"; - fail('NOMINATIM_PYOSMIUM_BINARY not configured'); - } - - $aOutput = 0; - $oCMD = new \Nominatim\Shell($sPyosmiumBin, '--help'); - exec($oCMD->escapedCmd(), $aOutput, $iRet); - - if ($iRet != 0) { - echo "Cannot execute pyosmium-get-changes.\n"; - echo "Make sure you have pyosmium installed correctly\n"; - echo "and have set up NOMINATIM_PYOSMIUM_BINARY to point to pyosmium-get-changes.\n"; - fail('pyosmium-get-changes not found or not usable'); - } - - if (!$aResult['no-update-functions']) { - (clone($oNominatimCmd))->addParams('refresh', '--functions')->run(); - } - - $sDatabaseDate = getDatabaseDate($oDB); - if (!$sDatabaseDate) { - fail('Cannot determine date of database.'); - } - $sWindBack = strftime('%Y-%m-%dT%H:%M:%SZ', strtotime($sDatabaseDate) - (3*60*60)); - - // get the appropriate state id - $aOutput = 0; - $oCMD = (new \Nominatim\Shell($sPyosmiumBin)) - ->addParams('--start-date', $sWindBack) - ->addParams('--server', $sBaseURL); - - exec($oCMD->escapedCmd(), $aOutput, $iRet); - if ($iRet != 0 || $aOutput[0] == 'None') { - fail('Error running pyosmium tools'); - } - - $oDB->exec('TRUNCATE import_status'); - $sSQL = "INSERT INTO import_status (lastimportdate, sequence_id, indexed) VALUES('"; - $sSQL .= $sDatabaseDate."',".$aOutput[0].', true)'; + $oCmd = (clone($oNominatimCmd))->addParams('replication', '--init'); - try { - $oDB->exec($sSQL); - } catch (\Nominatim\DatabaseError $e) { - fail('Could not enter sequence into database.'); + if ($aResult['no-update-functions']) { + $oCmd->addParams('--no-update-functions'); } - echo "Done. Database updates will start at sequence $aOutput[0] ($sWindBack)\n"; + $oCmd->run(); } if ($aResult['check-for-updates']) { diff --git a/nominatim/cli.py b/nominatim/cli.py index 4b38040c..a9ffef33 100644 --- a/nominatim/cli.py +++ b/nominatim/cli.py @@ -234,12 +234,29 @@ class UpdateReplication: @staticmethod def run(args): + try: + import osmium # pylint: disable=W0611 + except ModuleNotFoundError: + LOG.fatal("pyosmium not installed. Replication functions not available.\n" + "To install pyosmium via pip: pip3 install osmium") + return 1 + + from .tools import replication, refresh + + conn = connect(args.config.get_libpq_dsn()) + params = ['update.php'] if args.init: - params.append('--init-updates') - if not args.update_functions: - params.append('--no-update-functions') - elif args.check_for_updates: + LOG.warning("Initialising replication updates") + replication.init_replication(conn, args.config.REPLICATION_URL) + if args.update_functions: + LOG.warning("Create functions") + refresh.create_functions(conn, args.config, args.data_dir, + True, False) + conn.close() + return 0 + + if args.check_for_updates: params.append('--check-for-updates') else: if args.once: diff --git a/nominatim/db/connection.py b/nominatim/db/connection.py new file mode 100644 index 00000000..8e75d7a2 --- /dev/null +++ b/nominatim/db/connection.py @@ -0,0 +1,58 @@ +""" +Specialised connection and cursor functions. +""" +import logging + +import psycopg2 +import psycopg2.extensions +import psycopg2.extras + +class _Cursor(psycopg2.extras.DictCursor): + """ A cursor returning dict-like objects and providing specialised + execution functions. + """ + + def execute(self, query, args=None): # pylint: disable=W0221 + """ Query execution that logs the SQL query when debugging is enabled. + """ + logger = logging.getLogger() + logger.debug(self.mogrify(query, args).decode('utf-8')) + + super().execute(query, args) + + def scalar(self, sql, args=None): + """ Execute query that returns a single value. The value is returned. + If the query yields more than one row, a ValueError is raised. + """ + self.execute(sql, args) + + if self.rowcount != 1: + raise ValueError("Query did not return a single row.") + + return self.fetchone()[0] + + +class _Connection(psycopg2.extensions.connection): + """ A connection that provides the specialised cursor by default and + adds convenience functions for administrating the database. + """ + + def cursor(self, cursor_factory=_Cursor, **kwargs): + """ Return a new cursor. By default the specialised cursor is returned. + """ + return super().cursor(cursor_factory=cursor_factory, **kwargs) + + def table_exists(self, table): + """ Check that a table with the given name exists in the database. + """ + with self.cursor() as cur: + num = cur.scalar("""SELECT count(*) FROM pg_tables + WHERE tablename = %s""", (table, )) + return num == 1 + + +def connect(dsn): + """ Open a connection to the database using the specialised connection + factory. + """ + return psycopg2.connect(dsn, connection_factory=_Connection) diff --git a/nominatim/db/status.py b/nominatim/db/status.py new file mode 100644 index 00000000..af4b85c3 --- /dev/null +++ b/nominatim/db/status.py @@ -0,0 +1,50 @@ +""" +Access and helper functions for the status table. +""" +import datetime as dt +import logging +import re + +from ..tools.exec_utils import get_url + +LOG = logging.getLogger() + +def compute_database_date(conn): + """ Determine the date of the database from the newest object in the + data base. + """ + # First, find the node with the highest ID in the database + with conn.cursor() as cur: + osmid = cur.scalar("SELECT max(osm_id) FROM place WHERE osm_type='N'") + + if osmid is None: + LOG.fatal("No data found in the database.") + raise RuntimeError("No data found in the database.") + + LOG.info("Using node id %d for timestamp lookup", osmid) + # Get the node from the API to find the timestamp when it was created. + node_url = 'https://www.openstreetmap.org/api/0.6/node/{}/1'.format(osmid) + data = get_url(node_url) + + match = re.search(r'timestamp="((\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2}))Z"', data) + + if match is None: + LOG.fatal("The node data downloaded from the API does not contain valid data.\n" + "URL used: %s", node_url) + raise RuntimeError("Bad API data.") + + LOG.debug("Found timestamp %s", match[1]) + + return dt.datetime.fromisoformat(match[1]).replace(tzinfo=dt.timezone.utc) + + +def set_status(conn, date, seq=None, indexed=True): + """ Replace the current status with the given status. + """ + assert date.tzinfo == dt.timezone.utc + with conn.cursor() as cur: + cur.execute("TRUNCATE TABLE import_status") + cur.execute("""INSERT INTO import_status (lastimportdate, sequence_id, indexed) + VALUES (%s, %s, %s)""", (date, seq, indexed)) + + conn.commit() diff --git a/nominatim/tools/exec_utils.py b/nominatim/tools/exec_utils.py index 9e16e293..0d3db204 100644 --- a/nominatim/tools/exec_utils.py +++ b/nominatim/tools/exec_utils.py @@ -3,8 +3,13 @@ Helper functions for executing external programs. """ import logging import subprocess +import urllib.request as urlrequest from urllib.parse import urlencode +from ..version import NOMINATIM_VERSION + +LOG = logging.getLogger() + def run_legacy_script(script, *args, nominatim_env=None, throw_on_fail=False): """ Run a Nominatim PHP script with the given arguments. @@ -80,3 +85,16 @@ def run_api_script(endpoint, project_dir, extra_env=None, phpcgi_bin=None, print(result[content_start + 4:].replace('\\n', '\n')) return 0 + + +def get_url(url): + """ Get the contents from the given URL and return it as a UTF-8 string. + """ + headers = {"User-Agent" : "Nominatim/" + NOMINATIM_VERSION} + + try: + with urlrequest.urlopen(urlrequest.Request(url, headers=headers)) as response: + return response.read().decode('utf-8') + except: + LOG.fatal('Failed to load URL: %s', url) + raise diff --git a/nominatim/tools/replication.py b/nominatim/tools/replication.py new file mode 100644 index 00000000..86405168 --- /dev/null +++ b/nominatim/tools/replication.py @@ -0,0 +1,34 @@ +""" +Functions for updating a database from a replication source. +""" +import datetime +import logging + +from osmium.replication.server import ReplicationServer + +from ..db import status + +LOG = logging.getLogger() + +def init_replication(conn, base_url): + """ Set up replication for the server at the given base URL. + """ + LOG.info("Using replication source: %s", base_url) + date = status.compute_database_date(conn) + + # margin of error to make sure we get all data + date -= datetime.timedelta(hours=3) + + repl = ReplicationServer(base_url) + + seq = repl.timestamp_to_sequence(date) + + if seq is None: + LOG.fatal("Cannot reach the configured replication service '%s'.\n" + "Does the URL point to a directory containing OSM update data?", + base_url) + raise RuntimeError("Failed to reach replication service") + + status.set_status(conn, date=date, seq=seq) + + LOG.warning("Updates intialised at sequence %s (%s)", seq, date) diff --git a/nominatim/version.py b/nominatim/version.py new file mode 100644 index 00000000..a2ddc9fa --- /dev/null +++ b/nominatim/version.py @@ -0,0 +1,5 @@ +""" +Version information for Nominatim. +""" + +NOMINATIM_VERSION = "3.6.0" diff --git a/settings/env.defaults b/settings/env.defaults index fbad3e33..9fefc622 100644 --- a/settings/env.defaults +++ b/settings/env.defaults @@ -57,6 +57,9 @@ NOMINATIM_HTTP_PROXY_HOST=proxy.mydomain.com NOMINATIM_HTTP_PROXY_PORT=3128 NOMINATIM_HTTP_PROXY_LOGIN= NOMINATIM_HTTP_PROXY_PASSWORD= +# Also set these standard environment variables. +# HTTP_PROXY="http://user:pass@10.10.1.10:1080" +# HTTPS_PROXY="http://user:pass@10.10.1.10:1080" # Location of the osm2pgsql binary. # When empty, osm2pgsql is expected to reside in the osm2pgsql directory in diff --git a/sql/tables.sql b/sql/tables.sql index 5686bcd2..8647e304 100644 --- a/sql/tables.sql +++ b/sql/tables.sql @@ -1,6 +1,6 @@ drop table if exists import_status; CREATE TABLE import_status ( - lastimportdate timestamp NOT NULL, + lastimportdate timestamp with time zone NOT NULL, sequence_id integer, indexed boolean ); diff --git a/test/python/conftest.py b/test/python/conftest.py index d92df5c5..2e81e919 100644 --- a/test/python/conftest.py +++ b/test/python/conftest.py @@ -1,3 +1,4 @@ +import itertools import sys from pathlib import Path @@ -11,6 +12,7 @@ SRC_DIR = Path(__file__) / '..' / '..' / '..' sys.path.insert(0, str(SRC_DIR.resolve())) from nominatim.config import Configuration +from nominatim.db import connection class _TestingCursor(psycopg2.extras.DictCursor): """ Extension to the DictCursor class that provides execution @@ -40,27 +42,42 @@ def temp_db(monkeypatch): exported into NOMINATIM_DATABASE_DSN. """ name = 'test_nominatim_python_unittest' - with psycopg2.connect(database='postgres') as conn: - conn.set_isolation_level(0) - with conn.cursor() as cur: - cur.execute('DROP DATABASE IF EXISTS {}'.format(name)) - cur.execute('CREATE DATABASE {}'.format(name)) + conn = psycopg2.connect(database='postgres') + + conn.set_isolation_level(0) + with conn.cursor() as cur: + cur.execute('DROP DATABASE IF EXISTS {}'.format(name)) + cur.execute('CREATE DATABASE {}'.format(name)) + + conn.close() monkeypatch.setenv('NOMINATIM_DATABASE_DSN' , 'dbname=' + name) yield name - with psycopg2.connect(database='postgres') as conn: - conn.set_isolation_level(0) - with conn.cursor() as cur: - cur.execute('DROP DATABASE IF EXISTS {}'.format(name)) + conn = psycopg2.connect(database='postgres') + conn.set_isolation_level(0) + with conn.cursor() as cur: + cur.execute('DROP DATABASE IF EXISTS {}'.format(name)) + + conn.close() + +@pytest.fixture +def temp_db_with_extensions(temp_db): + conn = psycopg2.connect(database=temp_db) + with conn.cursor() as cur: + cur.execute('CREATE EXTENSION hstore; CREATE EXTENSION postgis;') + conn.commit() + conn.close() + + return temp_db @pytest.fixture def temp_db_conn(temp_db): """ Connection to the test database. """ - conn = psycopg2.connect(database=temp_db) + conn = connection.connect('dbname=' + temp_db) yield conn conn.close() @@ -80,3 +97,50 @@ def temp_db_cursor(temp_db): @pytest.fixture def def_config(): return Configuration(None, SRC_DIR.resolve() / 'settings') + + +@pytest.fixture +def status_table(temp_db_conn): + """ Create an empty version of the status table. + """ + with temp_db_conn.cursor() as cur: + cur.execute("""CREATE TABLE import_status ( + lastimportdate timestamp with time zone NOT NULL, + sequence_id integer, + indexed boolean + )""") + temp_db_conn.commit() + + +@pytest.fixture +def place_table(temp_db_with_extensions, temp_db_conn): + """ Create an empty version of the place table. + """ + with temp_db_conn.cursor() as cur: + cur.execute("""CREATE TABLE place ( + osm_id int8 NOT NULL, + osm_type char(1) NOT NULL, + class text NOT NULL, + type text NOT NULL, + name hstore, + admin_level smallint, + address hstore, + extratags hstore, + geometry Geometry(Geometry,4326) NOT NULL)""") + temp_db_conn.commit() + + +@pytest.fixture +def place_row(place_table, temp_db_cursor): + """ A factory for rows in the place table. The table is created as a + prerequisite to the fixture. + """ + idseq = itertools.count(1001) + def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None, + admin_level=None, address=None, extratags=None, geom=None): + temp_db_cursor.execute("INSERT INTO place VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", + (osm_id or next(idseq), osm_type, cls, typ, names, + admin_level, address, extratags, + geom or 'SRID=4326;POINT(0 0 )')) + + return _insert diff --git a/test/python/test_cli.py b/test/python/test_cli.py index 942a9409..ed46eba3 100644 --- a/test/python/test_cli.py +++ b/test/python/test_cli.py @@ -7,6 +7,7 @@ import pytest import nominatim.cli import nominatim.indexer.indexer import nominatim.tools.refresh +import nominatim.tools.replication def call_nominatim(*args): return nominatim.cli.nominatim(module_dir='build/module', @@ -56,7 +57,6 @@ def test_cli_help(capsys): (('import', '--continue', 'load-data'), 'setup'), (('freeze',), 'setup'), (('special-phrases',), 'specialphrases'), - (('replication',), 'update'), (('add-data', '--tiger-data', 'tiger'), 'setup'), (('add-data', '--file', 'foo.osm'), 'update'), (('check-database',), 'check_import_finished'), @@ -102,7 +102,7 @@ def test_index_command(monkeypatch, temp_db_cursor, params, do_bnds, do_ranks): ('importance', ('update.php', '--recompute-importance')), ('website', ('setup.php', '--setup-website')), ]) -def test_refresh_legacy_command(mock_run_legacy, command, params): +def test_refresh_legacy_command(mock_run_legacy, temp_db, command, params): assert 0 == call_nominatim('refresh', '--' + command) assert mock_run_legacy.called == 1 @@ -115,7 +115,7 @@ def test_refresh_legacy_command(mock_run_legacy, command, params): ('address-levels', 'load_address_levels_from_file'), ('functions', 'create_functions'), ]) -def test_refresh_command(monkeypatch, command, func): +def test_refresh_command(monkeypatch, temp_db, command, func): func_mock = MockParamCapture() monkeypatch.setattr(nominatim.tools.refresh, func, func_mock) @@ -123,12 +123,26 @@ def test_refresh_command(monkeypatch, command, func): assert func_mock.called == 1 -def test_refresh_importance_computed_after_wiki_import(mock_run_legacy): + +def test_refresh_importance_computed_after_wiki_import(mock_run_legacy, temp_db): assert 0 == call_nominatim('refresh', '--importance', '--wiki-data') assert mock_run_legacy.called == 2 assert mock_run_legacy.last_args == ('update.php', '--recompute-importance') + +@pytest.mark.parametrize("params,func", [ + (('--init', '--no-update-functions'), 'init_replication') + ]) +def test_replication_command(monkeypatch, temp_db, params, func): + func_mock = MockParamCapture() + monkeypatch.setattr(nominatim.tools.replication, func, func_mock) + + assert 0 == call_nominatim('replication', *params) + + assert func_mock.called == 1 + + @pytest.mark.parametrize("params", [ ('search', '--query', 'new'), ('reverse', '--lat', '0', '--lon', '0'), diff --git a/test/python/test_db_status.py b/test/python/test_db_status.py new file mode 100644 index 00000000..d73b099e --- /dev/null +++ b/test/python/test_db_status.py @@ -0,0 +1,74 @@ +""" +Tests for status table manipulation. +""" +import datetime as dt + +import pytest + +import nominatim.db.status + +def test_compute_database_date_place_empty(status_table, place_table, temp_db_conn): + with pytest.raises(RuntimeError): + nominatim.db.status.compute_database_date(temp_db_conn) + +OSM_NODE_DATA = """\ + + + + +""" + +def test_compute_database_date_valid(monkeypatch, status_table, place_row, temp_db_conn): + place_row(osm_type='N', osm_id=45673) + + requested_url = [] + def mock_url(url): + requested_url.append(url) + return OSM_NODE_DATA + + monkeypatch.setattr(nominatim.db.status, "get_url", mock_url) + + date = nominatim.db.status.compute_database_date(temp_db_conn) + + assert requested_url == ['https://www.openstreetmap.org/api/0.6/node/45673/1'] + assert date == dt.datetime.fromisoformat('2006-01-27T22:09:10').replace(tzinfo=dt.timezone.utc) + + +def test_compute_database_broken_api(monkeypatch, status_table, place_row, temp_db_conn): + place_row(osm_type='N', osm_id=45673) + + requested_url = [] + def mock_url(url): + requested_url.append(url) + return ' + + + +""" + + +def test_init_replication_bad_base_url(monkeypatch, status_table, place_row, temp_db_conn, temp_db_cursor): + place_row(osm_type='N', osm_id=100) + + monkeypatch.setattr(nominatim.db.status, "get_url", lambda u : OSM_NODE_DATA) + + with pytest.raises(RuntimeError, match="Failed to reach replication service"): + nominatim.tools.replication.init_replication(temp_db_conn, 'https://test.io') + + +def test_init_replication_success(monkeypatch, status_table, place_row, temp_db_conn, temp_db_cursor): + place_row(osm_type='N', osm_id=100) + + monkeypatch.setattr(nominatim.db.status, "get_url", lambda u : OSM_NODE_DATA) + monkeypatch.setattr(nominatim.tools.replication.ReplicationServer, + "timestamp_to_sequence", + lambda self, date: 234) + + nominatim.tools.replication.init_replication(temp_db_conn, 'https://test.io') + + temp_db_cursor.execute("SELECT * FROM import_status") + + expected_date = dt.datetime.fromisoformat('2006-01-27T19:09:10').replace(tzinfo=dt.timezone.utc) + assert temp_db_cursor.rowcount == 1 + assert temp_db_cursor.fetchone() == [expected_date, 234, True]