if ($aCMDResult['import-tiger-data']) {
$bDidSomething = true;
$sTigerPath = getSetting('TIGER_DATA_PATH', CONST_InstallDir.'/tiger');
- $oSetup->importTigerData($sTigerPath);
+ run((clone($oNominatimCmd))->addParams('transition', '--tiger-data', $sTigerPath));
}
if ($aCMDResult['calculate-postcodes'] || $aCMDResult['all']) {
}
}
- public function importTigerData($sTigerPath)
- {
- info('Import Tiger data');
-
- $aFilenames = glob($sTigerPath.'/*.sql');
- info('Found '.count($aFilenames).' SQL files in path '.$sTigerPath);
- if (empty($aFilenames)) {
- warn('Tiger data import selected but no files found in path '.$sTigerPath);
- return;
- }
- $sTemplate = file_get_contents(CONST_SqlDir.'/tiger_import_start.sql');
- $sTemplate = $this->replaceSqlPatterns($sTemplate);
-
- $this->pgsqlRunScript($sTemplate, false);
-
- $aDBInstances = array();
- for ($i = 0; $i < $this->iInstances; $i++) {
- // https://secure.php.net/manual/en/function.pg-connect.php
- $DSN = getSetting('DATABASE_DSN');
- $DSN = preg_replace('/^pgsql:/', '', $DSN);
- $DSN = preg_replace('/;/', ' ', $DSN);
- $aDBInstances[$i] = pg_connect($DSN, PGSQL_CONNECT_FORCE_NEW | PGSQL_CONNECT_ASYNC);
- pg_ping($aDBInstances[$i]);
- }
-
- foreach ($aFilenames as $sFile) {
- echo $sFile.': ';
- $hFile = fopen($sFile, 'r');
- $sSQL = fgets($hFile, 100000);
- $iLines = 0;
- while (true) {
- for ($i = 0; $i < $this->iInstances; $i++) {
- if (!pg_connection_busy($aDBInstances[$i])) {
- while (pg_get_result($aDBInstances[$i]));
- $sSQL = fgets($hFile, 100000);
- if (!$sSQL) break 2;
- if (!pg_send_query($aDBInstances[$i], $sSQL)) fail(pg_last_error($aDBInstances[$i]));
- $iLines++;
- if ($iLines == 1000) {
- echo '.';
- $iLines = 0;
- }
- }
- }
- usleep(10);
- }
- fclose($hFile);
-
- $bAnyBusy = true;
- while ($bAnyBusy) {
- $bAnyBusy = false;
- for ($i = 0; $i < $this->iInstances; $i++) {
- if (pg_connection_busy($aDBInstances[$i])) $bAnyBusy = true;
- }
- usleep(10);
- }
- echo "\n";
- }
-
- for ($i = 0; $i < $this->iInstances; $i++) {
- pg_close($aDBInstances[$i]);
- }
-
- info('Creating indexes on Tiger data');
- $sTemplate = file_get_contents(CONST_SqlDir.'/tiger_import_finish.sql');
- $sTemplate = $this->replaceSqlPatterns($sTemplate);
-
- $this->pgsqlRunScript($sTemplate, false);
- }
-
public function calculatePostcodes($bCMDResultAll)
{
info('Calculate Postcodes');
--index only on parent_place_id
-CREATE INDEX idx_location_property_tiger_parent_place_id_imp ON location_property_tiger_import (parent_place_id) {ts:aux-index};
-CREATE UNIQUE INDEX idx_location_property_tiger_place_id_imp ON location_property_tiger_import (place_id) {ts:aux-index};
+CREATE INDEX {{sql.if_index_not_exists}} idx_location_property_tiger_place_id_imp
+ ON location_property_tiger_import (parent_place_id) {{db.tablespace.aux_index}};
+CREATE UNIQUE INDEX {{sql.if_index_not_exists}} idx_location_property_tiger_place_id_imp
+ ON location_property_tiger_import (place_id) {{db.tablespace.aux_index}};
-GRANT SELECT ON location_property_tiger_import TO "{www-user}";
+GRANT SELECT ON location_property_tiger_import TO "{{config.DATABASE_WEBUSER}}";
DROP TABLE IF EXISTS location_property_tiger;
ALTER TABLE location_property_tiger_import RENAME TO location_property_tiger;
-ALTER INDEX idx_location_property_tiger_parent_place_id_imp RENAME TO idx_location_property_tiger_housenumber_parent_place_id;
-ALTER INDEX idx_location_property_tiger_place_id_imp RENAME TO idx_location_property_tiger_place_id;
+ALTER INDEX IF EXISTS idx_location_property_tiger_parent_place_id_imp RENAME TO idx_location_property_tiger_housenumber_parent_place_id;
+ALTER INDEX IF EXISTS idx_location_property_tiger_place_id_imp RENAME TO idx_location_property_tiger_place_id;
DROP FUNCTION tiger_line_import (linegeo geometry, in_startnumber integer, in_endnumber integer, interpolationtype text, in_street text, in_isin text, in_postcode text);
from .errors import UsageError
from . import clicmd
from .clicmd.args import NominatimArgs
+from .tools import tiger_data
LOG = logging.getLogger()
@staticmethod
def run(args):
if args.tiger_data:
- os.environ['NOMINATIM_TIGER_DATA_PATH'] = args.tiger_data
- return run_legacy_script('setup.php', '--import-tiger-data', nominatim_env=args)
+ return tiger_data.add_tiger_data(args.config.get_libpq_dsn(),
+ args.tiger_data,
+ args.threads or 1,
+ args.config,
+ args.sqllib_dir)
params = ['update.php']
if args.file:
help="Ignore certain erros on import.")
group.add_argument('--reverse-only', action='store_true',
help='Do not create search tables and indexes')
+ group.add_argument('--tiger-data', metavar='FILE',
+ help='File to import')
@staticmethod
def run(args):
- from ..tools import database_import
+ from ..tools import database_import, tiger_data
from ..tools import refresh
if args.create_db:
LOG.warning('Create Search indices')
with connect(args.config.get_libpq_dsn()) as conn:
database_import.create_search_indices(conn, args.config, args.sqllib_dir, args.drop)
+
+ if args.tiger_data:
+ LOG.warning('Tiger data')
+ tiger_data.add_tiger_data(args.config.get_libpq_dsn(),
+ args.tiger_data,
+ args.threads or 1,
+ args.config,
+ args.sqllib_dir)
--- /dev/null
+"""
+Functions for importing tiger data and handling tarbar and directory files
+"""
+import logging
+import os
+import tarfile
+import selectors
+
+from ..db.connection import connect
+from ..db.async_connection import DBConnection
+from ..db.sql_preprocessor import SQLPreprocessor
+
+
+LOG = logging.getLogger()
+
+
+def handle_tarfile_or_directory(data_dir):
+ """ Handles tarfile or directory for importing tiger data
+ """
+
+ tar = None
+ if data_dir.endswith('.tar.gz'):
+ tar = tarfile.open(data_dir)
+ sql_files = [i for i in tar.getmembers() if i.name.endswith('.sql')]
+ LOG.warning("Found %d SQL files in tarfile with path %s", len(sql_files), data_dir)
+ if not sql_files:
+ LOG.warning("Tiger data import selected but no files in tarfile's path %s", data_dir)
+ return None, None
+ else:
+ files = os.listdir(data_dir)
+ sql_files = [os.path.join(data_dir, i) for i in files if i.endswith('.sql')]
+ LOG.warning("Found %d SQL files in path %s", len(sql_files), data_dir)
+ if not sql_files:
+ LOG.warning("Tiger data import selected but no files found in path %s", data_dir)
+ return None, None
+
+ return sql_files, tar
+
+
+def handle_threaded_sql_statements(sel, file):
+ """ Handles sql statement with multiplexing
+ """
+
+ lines = 0
+ end_of_file = False
+ # Using pool of database connections to execute sql statements
+ while not end_of_file:
+ for key, _ in sel.select(1):
+ conn = key.data
+ try:
+ if conn.is_done():
+ sql_query = file.readline()
+ lines += 1
+ if not sql_query:
+ end_of_file = True
+ break
+ conn.perform(sql_query)
+ if lines == 1000:
+ print('. ', end='', flush=True)
+ lines = 0
+ except Exception as exc: # pylint: disable=broad-except
+ LOG.info('Wrong SQL statement: %s', exc)
+
+
+def add_tiger_data(dsn, data_dir, threads, config, sqllib_dir):
+ """ Import tiger data from directory or tar file
+ """
+
+ sql_files, tar = handle_tarfile_or_directory(data_dir)
+
+ if not sql_files:
+ return
+
+ with connect(dsn) as conn:
+ sql = SQLPreprocessor(conn, config, sqllib_dir)
+ sql.run_sql_file(conn, 'tiger_import_start.sql')
+
+ # Reading sql_files and then for each file line handling
+ # sql_query in <threads - 1> chunks.
+ sel = selectors.DefaultSelector()
+ place_threads = max(1, threads - 1)
+
+ # Creates a pool of database connections
+ for _ in range(place_threads):
+ conn = DBConnection(dsn)
+ conn.connect()
+ sel.register(conn, selectors.EVENT_WRITE, conn)
+
+ for sql_file in sql_files:
+ if not tar:
+ file = open(sql_file)
+ else:
+ file = tar.extractfile(sql_file)
+
+ handle_threaded_sql_statements(sel, file)
+
+ # Unregistering pool of database connections
+ while place_threads > 0:
+ for key, _ in sel.select(1):
+ conn = key.data
+ sel.unregister(conn)
+ conn.wait()
+ conn.close()
+ place_threads -= 1
+
+ if tar:
+ tar.close()
+ print('\n')
+ LOG.warning("Creating indexes on Tiger data")
+ with connect(dsn) as conn:
+ sql = SQLPreprocessor(conn, config, sqllib_dir)
+ sql.run_sql_file(conn, 'tiger_import_finish.sql')
from nominatim.config import Configuration
from nominatim.db import connection
+from nominatim.db.sql_preprocessor import SQLPreprocessor
class _TestingCursor(psycopg2.extras.DictCursor):
""" Extension to the DictCursor class that provides execution
flatnode_file='',
tablespaces=dict(slim_data='', slim_index='',
main_data='', main_index=''))
+
+@pytest.fixture
+def sql_preprocessor(temp_db_conn, tmp_path, def_config, monkeypatch, table_factory):
+ monkeypatch.setenv('NOMINATIM_DATABASE_MODULE_PATH', '.')
+ table_factory('country_name', 'partition INT', (0, 1, 2))
+ return SQLPreprocessor(temp_db_conn, def_config, tmp_path)
@pytest.mark.parametrize("command,script", [
(('special-phrases',), 'specialphrases'),
- (('add-data', '--tiger-data', 'tiger'), 'setup'),
(('add-data', '--file', 'foo.osm'), 'update'),
(('export',), 'export')
])
import pytest
-from nominatim.db.sql_preprocessor import SQLPreprocessor
-
@pytest.fixture
def sql_factory(tmp_path):
def _mk_sql(sql_body):
return _mk_sql
-
-@pytest.fixture
-def sql_preprocessor(temp_db_conn, tmp_path, def_config, monkeypatch, table_factory):
- monkeypatch.setenv('NOMINATIM_DATABASE_MODULE_PATH', '.')
- table_factory('country_name', 'partition INT', (0, 1, 2))
- return SQLPreprocessor(temp_db_conn, def_config, tmp_path)
-
@pytest.mark.parametrize("expr,ret", [
("'a'", 'a'),
("'{{db.partitions|join}}'", '012'),
--- /dev/null
+"""
+Test for tiger data function
+"""
+from pathlib import Path
+
+import pytest
+import tarfile
+
+from nominatim.tools import tiger_data, database_import
+
+
+@pytest.mark.parametrize("threads", (1, 5))
+def test_add_tiger_data(dsn, src_dir, def_config, tmp_path, sql_preprocessor,
+ temp_db_cursor, threads, temp_db):
+ temp_db_cursor.execute('CREATE EXTENSION hstore')
+ temp_db_cursor.execute('CREATE EXTENSION postgis')
+ temp_db_cursor.execute('CREATE TABLE place (id INT)')
+ sqlfile = tmp_path / '1010.sql'
+ sqlfile.write_text("""INSERT INTO place values (1)""")
+ tiger_data.add_tiger_data(dsn, str(tmp_path), threads, def_config, src_dir / 'lib-sql')
+
+ assert temp_db_cursor.table_rows('place') == 1
+
+@pytest.mark.parametrize("threads", (1, 5))
+def test_add_tiger_data_tarfile(dsn, src_dir, def_config, tmp_path,
+ temp_db_cursor, threads, temp_db, sql_preprocessor):
+ temp_db_cursor.execute('CREATE EXTENSION hstore')
+ temp_db_cursor.execute('CREATE EXTENSION postgis')
+ temp_db_cursor.execute('CREATE TABLE place (id INT)')
+ sqlfile = tmp_path / '1010.sql'
+ sqlfile.write_text("""INSERT INTO place values (1)""")
+ tar = tarfile.open("sample.tar.gz", "w:gz")
+ tar.add(sqlfile)
+ tar.close()
+ tiger_data.add_tiger_data(dsn, str(src_dir / 'sample.tar.gz'), threads, def_config, src_dir / 'lib-sql')
+
+ assert temp_db_cursor.table_rows('place') == 1
\ No newline at end of file