Use sql path from config.
and follows its syntax.
"""
- def __init__(self, conn, config, sqllib_dir):
+ def __init__(self, conn, config):
self.env = jinja2.Environment(autoescape=False,
- loader=jinja2.FileSystemLoader(str(sqllib_dir)))
+ loader=jinja2.FileSystemLoader(str(config.lib_dir.sql)))
db_info = {}
db_info['partitions'] = _get_partitions(conn)
When `reverse_only` is True, then the main table for searching will
be skipped and only reverse search is possible.
"""
- sql = SQLPreprocessor(conn, config, sqllib_dir)
+ sql = SQLPreprocessor(conn, config)
sql.env.globals['db']['reverse_only'] = reverse_only
sql.run_sql_file(conn, 'tables.sql')
""" Create the triggers for the tables. The trigger functions must already
have been imported with refresh.create_functions().
"""
- sql = SQLPreprocessor(conn, config, sqllib_dir)
+ sql = SQLPreprocessor(conn, config)
sql.run_sql_file(conn, 'table-triggers.sql')
def create_partition_tables(conn, config, sqllib_dir):
""" Create tables that have explicit partitioning.
"""
- sql = SQLPreprocessor(conn, config, sqllib_dir)
+ sql = SQLPreprocessor(conn, config)
sql.run_sql_file(conn, 'partition-tables.src.sql')
cur.execute('DROP INDEX "{}"'.format(idx))
conn.commit()
- sql = SQLPreprocessor(conn, config, sqllib_dir)
+ sql = SQLPreprocessor(conn, config)
sql.run_sql_file(conn, 'indices.sql', drop=drop)
enable_diff_updates=True, enable_debug=False):
""" (Re)create the PL/pgSQL functions.
"""
- sql = SQLPreprocessor(conn, config, sqllib_dir)
+ sql = SQLPreprocessor(conn, config)
sql.run_sql_file(conn, 'functions.sql',
disable_diff_updates=not enable_diff_updates,
return
with connect(dsn) as conn:
- sql = SQLPreprocessor(conn, config, config.lib_dir.sql)
+ sql = SQLPreprocessor(conn, config)
sql.run_sql_file(conn, 'tiger_import_start.sql')
# Reading sql_files and then for each file line handling
print('\n')
LOG.warning("Creating indexes on Tiger data")
with connect(dsn) as conn:
- sql = SQLPreprocessor(conn, config, config.lib_dir.sql)
+ sql = SQLPreprocessor(conn, config)
sql.run_sql_file(conn, 'tiger_import_finish.sql')
main_data='', main_index=''))
@pytest.fixture
-def sql_preprocessor(temp_db_conn, tmp_path, def_config, monkeypatch, table_factory):
+def sql_preprocessor(temp_db_conn, tmp_path, monkeypatch, table_factory):
monkeypatch.setenv('NOMINATIM_DATABASE_MODULE_PATH', '.')
table_factory('country_name', 'partition INT', (0, 1, 2))
- return SQLPreprocessor(temp_db_conn, def_config, tmp_path)
+ cfg = Configuration(None, SRC_DIR.resolve() / 'settings')
+ cfg.set_libdirs(module='.', osm2pgsql='.', php=SRC_DIR / 'lib-php',
+ sql=tmp_path, data=SRC_DIR / 'data')
+
+ return SQLPreprocessor(temp_db_conn, cfg)
from nominatim.tools.refresh import create_functions
+@pytest.fixture
+def sql_tmp_path(tmp_path, def_config):
+ def_config.lib_dir.sql = tmp_path
+ return tmp_path
+
@pytest.fixture
def conn(temp_db_conn, table_factory, monkeypatch):
monkeypatch.setenv('NOMINATIM_DATABASE_MODULE_PATH', '.')
return temp_db_conn
-def test_create_functions(temp_db_cursor, conn, def_config, tmp_path):
- sqlfile = tmp_path / 'functions.sql'
+def test_create_functions(temp_db_cursor, conn, def_config, sql_tmp_path):
+ sqlfile = sql_tmp_path / 'functions.sql'
sqlfile.write_text("""CREATE OR REPLACE FUNCTION test() RETURNS INTEGER
AS $$
BEGIN
$$ LANGUAGE plpgsql IMMUTABLE;
""")
- create_functions(conn, def_config, tmp_path)
+ create_functions(conn, def_config, sql_tmp_path)
assert temp_db_cursor.scalar('SELECT test()') == 43
@pytest.mark.parametrize("dbg,ret", ((True, 43), (False, 22)))
-def test_create_functions_with_template(temp_db_cursor, conn, def_config, tmp_path, dbg, ret):
- sqlfile = tmp_path / 'functions.sql'
+def test_create_functions_with_template(temp_db_cursor, conn, def_config, sql_tmp_path, dbg, ret):
+ sqlfile = sql_tmp_path / 'functions.sql'
sqlfile.write_text("""CREATE OR REPLACE FUNCTION test() RETURNS INTEGER
AS $$
BEGIN
$$ LANGUAGE plpgsql IMMUTABLE;
""")
- create_functions(conn, def_config, tmp_path, enable_debug=dbg)
+ create_functions(conn, def_config, sql_tmp_path, enable_debug=dbg)
assert temp_db_cursor.scalar('SELECT test()') == ret