X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/94fa7162be678dc74c1a5516e22916122ba66bbb..24c986c842a20e85961706c0f5bf4b0ce3d7e478:/test/python/conftest.py diff --git a/test/python/conftest.py b/test/python/conftest.py index d92df5c5..9a43a67e 100644 --- a/test/python/conftest.py +++ b/test/python/conftest.py @@ -1,8 +1,8 @@ +import itertools import sys from pathlib import Path import psycopg2 -import psycopg2.extras import pytest SRC_DIR = Path(__file__) / '..' / '..' / '..' @@ -11,28 +11,15 @@ SRC_DIR = Path(__file__) / '..' / '..' / '..' sys.path.insert(0, str(SRC_DIR.resolve())) from nominatim.config import Configuration +from nominatim.db import connection +from nominatim.db.sql_preprocessor import SQLPreprocessor +import nominatim.tokenizer.factory +import nominatim.cli -class _TestingCursor(psycopg2.extras.DictCursor): - """ Extension to the DictCursor class that provides execution - short-cuts that simplify writing assertions. - """ - - def scalar(self, sql, params=None): - """ Execute a query with a single return value and return this value. - Raises an assertion when not exactly one row is returned. - """ - self.execute(sql, params) - assert self.rowcount == 1 - return self.fetchone()[0] - - def row_set(self, sql, params=None): - """ Execute a query and return the result as a set of tuples. - """ - self.execute(sql, params) - if self.rowcount == 1: - return set(tuple(self.fetchone())) +import dummy_tokenizer +import mocks +from cursor import CursorForTesting - return set((tuple(row) for row in self)) @pytest.fixture def temp_db(monkeypatch): @@ -40,29 +27,49 @@ def temp_db(monkeypatch): exported into NOMINATIM_DATABASE_DSN. """ name = 'test_nominatim_python_unittest' - with psycopg2.connect(database='postgres') as conn: - conn.set_isolation_level(0) - with conn.cursor() as cur: - cur.execute('DROP DATABASE IF EXISTS {}'.format(name)) - cur.execute('CREATE DATABASE {}'.format(name)) + conn = psycopg2.connect(database='postgres') + + conn.set_isolation_level(0) + with conn.cursor() as cur: + cur.execute('DROP DATABASE IF EXISTS {}'.format(name)) + cur.execute('CREATE DATABASE {}'.format(name)) - monkeypatch.setenv('NOMINATIM_DATABASE_DSN' , 'dbname=' + name) + conn.close() + + monkeypatch.setenv('NOMINATIM_DATABASE_DSN', 'dbname=' + name) yield name - with psycopg2.connect(database='postgres') as conn: - conn.set_isolation_level(0) - with conn.cursor() as cur: - cur.execute('DROP DATABASE IF EXISTS {}'.format(name)) + conn = psycopg2.connect(database='postgres') + + conn.set_isolation_level(0) + with conn.cursor() as cur: + cur.execute('DROP DATABASE IF EXISTS {}'.format(name)) + + conn.close() + + +@pytest.fixture +def dsn(temp_db): + return 'dbname=' + temp_db + + +@pytest.fixture +def temp_db_with_extensions(temp_db): + conn = psycopg2.connect(database=temp_db) + with conn.cursor() as cur: + cur.execute('CREATE EXTENSION hstore; CREATE EXTENSION postgis;') + conn.commit() + conn.close() + return temp_db @pytest.fixture def temp_db_conn(temp_db): """ Connection to the test database. """ - conn = psycopg2.connect(database=temp_db) - yield conn - conn.close() + with connection.connect('dbname=' + temp_db) as conn: + yield conn @pytest.fixture @@ -72,11 +79,177 @@ def temp_db_cursor(temp_db): """ conn = psycopg2.connect('dbname=' + temp_db) conn.set_isolation_level(0) - with conn.cursor(cursor_factory=_TestingCursor) as cur: + with conn.cursor(cursor_factory=CursorForTesting) as cur: yield cur conn.close() +@pytest.fixture +def table_factory(temp_db_cursor): + """ A fixture that creates new SQL tables, potentially filled with + content. + """ + def mk_table(name, definition='id INT', content=None): + temp_db_cursor.execute('CREATE TABLE {} ({})'.format(name, definition)) + if content is not None: + temp_db_cursor.execute_values("INSERT INTO {} VALUES %s".format(name), content) + + return mk_table + + @pytest.fixture def def_config(): - return Configuration(None, SRC_DIR.resolve() / 'settings') + cfg = Configuration(None, SRC_DIR.resolve() / 'settings') + cfg.set_libdirs(module='.', osm2pgsql='.', + php=SRC_DIR / 'lib-php', + sql=SRC_DIR / 'lib-sql', + data=SRC_DIR / 'data') + return cfg + + +@pytest.fixture +def src_dir(): + return SRC_DIR.resolve() + + +@pytest.fixture +def cli_call(): + def _call_nominatim(*args): + return nominatim.cli.nominatim(module_dir='MODULE NOT AVAILABLE', + osm2pgsql_path='OSM2PGSQL NOT AVAILABLE', + phplib_dir=str(SRC_DIR / 'lib-php'), + data_dir=str(SRC_DIR / 'data'), + phpcgi_path='/usr/bin/php-cgi', + sqllib_dir=str(SRC_DIR / 'lib-sql'), + config_dir=str(SRC_DIR / 'settings'), + cli_args=args) + + return _call_nominatim + + +@pytest.fixture +def property_table(table_factory, temp_db_conn): + table_factory('nominatim_properties', 'property TEXT, value TEXT') + + return mocks.MockPropertyTable(temp_db_conn) + + +@pytest.fixture +def status_table(table_factory): + """ Create an empty version of the status table and + the status logging table. + """ + table_factory('import_status', + """lastimportdate timestamp with time zone NOT NULL, + sequence_id integer, + indexed boolean""") + table_factory('import_osmosis_log', + """batchend timestamp, + batchseq integer, + batchsize bigint, + starttime timestamp, + endtime timestamp, + event text""") + + +@pytest.fixture +def place_table(temp_db_with_extensions, table_factory): + """ Create an empty version of the place table. + """ + table_factory('place', + """osm_id int8 NOT NULL, + osm_type char(1) NOT NULL, + class text NOT NULL, + type text NOT NULL, + name hstore, + admin_level smallint, + address hstore, + extratags hstore, + geometry Geometry(Geometry,4326) NOT NULL""") + + +@pytest.fixture +def place_row(place_table, temp_db_cursor): + """ A factory for rows in the place table. The table is created as a + prerequisite to the fixture. + """ + idseq = itertools.count(1001) + def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None, + admin_level=None, address=None, extratags=None, geom=None): + temp_db_cursor.execute("INSERT INTO place VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", + (osm_id or next(idseq), osm_type, cls, typ, names, + admin_level, address, extratags, + geom or 'SRID=4326;POINT(0 0)')) + + return _insert + +@pytest.fixture +def placex_table(temp_db_with_extensions, temp_db_conn): + """ Create an empty version of the place table. + """ + return mocks.MockPlacexTable(temp_db_conn) + + +@pytest.fixture +def osmline_table(temp_db_with_extensions, table_factory): + table_factory('location_property_osmline', + """place_id BIGINT, + osm_id BIGINT, + parent_place_id BIGINT, + geometry_sector INTEGER, + indexed_date TIMESTAMP, + startnumber INTEGER, + endnumber INTEGER, + partition SMALLINT, + indexed_status SMALLINT, + linegeo GEOMETRY, + interpolationtype TEXT, + address HSTORE, + postcode TEXT, + country_code VARCHAR(2)""") + + +@pytest.fixture +def word_table(temp_db_conn): + return mocks.MockWordTable(temp_db_conn) + + +@pytest.fixture +def osm2pgsql_options(temp_db): + return dict(osm2pgsql='echo', + osm2pgsql_cache=10, + osm2pgsql_style='style.file', + threads=1, + dsn='dbname=' + temp_db, + flatnode_file='', + tablespaces=dict(slim_data='', slim_index='', + main_data='', main_index='')) + +@pytest.fixture +def sql_preprocessor(temp_db_conn, tmp_path, table_factory): + table_factory('country_name', 'partition INT', ((0, ), (1, ), (2, ))) + cfg = Configuration(None, SRC_DIR.resolve() / 'settings') + cfg.set_libdirs(module='.', osm2pgsql='.', php=SRC_DIR / 'lib-php', + sql=tmp_path, data=SRC_DIR / 'data') + + return SQLPreprocessor(temp_db_conn, cfg) + + +@pytest.fixture +def tokenizer_mock(monkeypatch, property_table): + """ Sets up the configuration so that the test dummy tokenizer will be + loaded when the tokenizer factory is used. Also returns a factory + with which a new dummy tokenizer may be created. + """ + monkeypatch.setenv('NOMINATIM_TOKENIZER', 'dummy') + + def _import_dummy(*args, **kwargs): + return dummy_tokenizer + + monkeypatch.setattr(nominatim.tokenizer.factory, "_import_tokenizer", _import_dummy) + property_table.set('tokenizer', 'dummy') + + def _create_tokenizer(): + return dummy_tokenizer.DummyTokenizer(None, None) + + return _create_tokenizer