From 3c186f80304b5b66795e9ef3cb9edb8f343c9416 Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Thu, 25 Feb 2021 11:25:01 +0100 Subject: [PATCH] add a function for the intial indexing run Also moves postcodes to fully parallel indexing. --- lib-php/admin/setup.php | 28 +++++++++--- lib-php/setup/SetupClass.php | 44 ------------------ nominatim/cli.py | 2 + nominatim/clicmd/transition.py | 10 ++++ nominatim/indexer/indexer.py | 76 +++++++++++++++++++++++++++++-- test/python/test_indexing.py | 58 +++++++++++++++++++++-- test/python/test_tools_refresh.py | 26 +++++++++++ 7 files changed, 184 insertions(+), 60 deletions(-) create mode 100644 test/python/test_tools_refresh.py diff --git a/lib-php/admin/setup.php b/lib-php/admin/setup.php index 6fca7c3c..cb7eeee1 100644 --- a/lib-php/admin/setup.php +++ b/lib-php/admin/setup.php @@ -64,6 +64,16 @@ if ($aCMDResult['verbose']) { $oNominatimCmd->addParams('--verbose'); } +// by default, use all but one processor, but never more than 15. +var_dump($aCMDResult); +$iInstances = max(1, $aCMDResult['threads'] ?? (min(16, getProcessorCount()) - 1)); + +function run($oCmd) { + global $iInstances; + $oCmd->addParams('--threads', $iInstances); + $oCmd->run(true); +} + //******************************************************* // Making some sanity check: @@ -81,7 +91,7 @@ $oSetup = new SetupFunctions($aCMDResult); // go through complete process if 'all' is selected or start selected functions if ($aCMDResult['create-db'] || $aCMDResult['all']) { $bDidSomething = true; - (clone($oNominatimCmd))->addParams('transition', '--create-db')->run(true); + run((clone($oNominatimCmd))->addParams('transition', '--create-db')); } if ($aCMDResult['setup-db'] || $aCMDResult['all']) { @@ -92,7 +102,7 @@ if ($aCMDResult['setup-db'] || $aCMDResult['all']) { $oCmd->addParams('--no-partitions'); } - $oCmd->run(true); + run($oCmd); } if ($aCMDResult['import-data'] || $aCMDResult['all']) { @@ -104,7 +114,7 @@ if ($aCMDResult['import-data'] || $aCMDResult['all']) { $oCmd->addParams('--drop'); } - $oCmd->run(true); + run($oCmd); } if ($aCMDResult['create-functions'] || $aCMDResult['all']) { @@ -131,6 +141,7 @@ if ($aCMDResult['create-partition-functions'] || $aCMDResult['all']) { if ($aCMDResult['import-wikipedia-articles'] || $aCMDResult['all']) { $bDidSomething = true; + // ignore errors! (clone($oNominatimCmd))->addParams('refresh', '--wiki-data')->run(); } @@ -152,12 +163,17 @@ if ($aCMDResult['calculate-postcodes'] || $aCMDResult['all']) { if ($aCMDResult['index'] || $aCMDResult['all']) { $bDidSomething = true; - $oSetup->index($aCMDResult['index-noanalyse']); + $oCmd = (clone($oNominatimCmd))->addParams('transition', '--index'); + if ($aCMDResult['index-noanalyse'] ?? false) { + $oCmd->addParams('--no-analyse'); + } + + run($oCmd); } if ($aCMDResult['drop']) { $bDidSomething = true; - (clone($oNominatimCmd))->addParams('freeze')->run(true); + run((clone($oNominatimCmd))->addParams('freeze')); } if ($aCMDResult['create-search-indices'] || $aCMDResult['all']) { @@ -172,7 +188,7 @@ if ($aCMDResult['create-country-names'] || $aCMDResult['all']) { if ($aCMDResult['setup-website'] || $aCMDResult['all']) { $bDidSomething = true; - (clone($oNominatimCmd))->addParams('refresh', '--website')->run(true); + run((clone($oNominatimCmd))->addParams('refresh', '--website')); } // ****************************************************** diff --git a/lib-php/setup/SetupClass.php b/lib-php/setup/SetupClass.php index 34c97319..e8c145ba 100755 --- a/lib-php/setup/SetupClass.php +++ b/lib-php/setup/SetupClass.php @@ -71,7 +71,6 @@ class SetupFunctions if ($this->bVerbose) { $this->oNominatimCmd->addParams('--verbose'); } - $this->oNominatimCmd->addParams('--threads', $this->iInstances); } public function createFunctions() @@ -380,49 +379,6 @@ class SetupFunctions $this->db()->exec($sSQL); } - public function index($bIndexNoanalyse) - { - $this->checkModulePresence(); // raises exception on failure - - $oBaseCmd = (clone $this->oNominatimCmd)->addParams('index'); - - info('Index ranks 0 - 4'); - $oCmd = (clone $oBaseCmd)->addParams('--maxrank', 4); - - $iStatus = $oCmd->run(); - if ($iStatus != 0) { - fail('error status ' . $iStatus . ' running nominatim!'); - } - if (!$bIndexNoanalyse) $this->pgsqlRunScript('ANALYSE'); - - info('Index administrative boundaries'); - $oCmd = (clone $oBaseCmd)->addParams('--boundaries-only'); - $iStatus = $oCmd->run(); - if ($iStatus != 0) { - fail('error status ' . $iStatus . ' running nominatim!'); - } - - info('Index ranks 5 - 25'); - $oCmd = (clone $oBaseCmd)->addParams('--no-boundaries', '--minrank', 5, '--maxrank', 25); - $iStatus = $oCmd->run(); - if ($iStatus != 0) { - fail('error status ' . $iStatus . ' running nominatim!'); - } - - if (!$bIndexNoanalyse) $this->pgsqlRunScript('ANALYSE'); - - info('Index ranks 26 - 30'); - $oCmd = (clone $oBaseCmd)->addParams('--no-boundaries', '--minrank', 26); - $iStatus = $oCmd->run(); - if ($iStatus != 0) { - fail('error status ' . $iStatus . ' running nominatim!'); - } - - info('Index postcodes'); - $sSQL = 'UPDATE location_postcode SET indexed_status = 0'; - $this->db()->exec($sSQL); - } - public function createSearchIndices() { info('Create Search indices'); diff --git a/nominatim/cli.py b/nominatim/cli.py index e1824cc6..eb652d64 100644 --- a/nominatim/cli.py +++ b/nominatim/cli.py @@ -171,6 +171,8 @@ class SetupAll: params.append('--ignore-errors') if args.index_noanalyse: params.append('--index-noanalyse') + if args.threads: + params.extend(('--threads', args.threads)) return run_legacy_script(*params, nominatim_env=args) diff --git a/nominatim/clicmd/transition.py b/nominatim/clicmd/transition.py index eb4e2d2f..4a5b44f5 100644 --- a/nominatim/clicmd/transition.py +++ b/nominatim/clicmd/transition.py @@ -32,6 +32,8 @@ class AdminTransition: help='Build a blank nominatim db') group.add_argument('--import-data', action='store_true', help='Import a osm file') + group.add_argument('--index', action='store_true', + help='Index the data') group = parser.add_argument_group('Options') group.add_argument('--no-partitions', action='store_true', help='Do not partition search indices') @@ -41,6 +43,8 @@ class AdminTransition: help='Drop tables needed for updates, making the database readonly') group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int, help='Size of cache to be used by osm2pgsql (in MB)') + group.add_argument('--no-analyse', action='store_true', + help='Do not perform analyse operations during index') @staticmethod def run(args): @@ -69,3 +73,9 @@ class AdminTransition: database_import.import_osm_data(Path(args.osm_file), args.osm2pgsql_options(0, 1), drop=args.drop) + + if args.index: + LOG.warning('Indexing') + from ..indexer.indexer import Indexer + indexer = Indexer(args.config.get_libpq_dsn(), args.threads or 1) + indexer.index_full() diff --git a/nominatim/indexer/indexer.py b/nominatim/indexer/indexer.py index 6e0ed60f..61971497 100644 --- a/nominatim/indexer/indexer.py +++ b/nominatim/indexer/indexer.py @@ -61,8 +61,8 @@ class InterpolationRunner: @staticmethod def sql_index_place(ids): return """UPDATE location_property_osmline - SET indexed_status = 0 WHERE place_id IN ({})"""\ - .format(','.join((str(i) for i in ids))) + SET indexed_status = 0 WHERE place_id IN ({}) + """.format(','.join((str(i) for i in ids))) class BoundaryRunner: """ Returns SQL commands for indexing the administrative boundaries @@ -79,19 +79,46 @@ class BoundaryRunner: return """SELECT count(*) FROM placex WHERE indexed_status > 0 AND rank_search = {} - AND class = 'boundary' and type = 'administrative'""".format(self.rank) + AND class = 'boundary' and type = 'administrative' + """.format(self.rank) def sql_get_objects(self): return """SELECT place_id FROM placex WHERE indexed_status > 0 and rank_search = {} and class = 'boundary' and type = 'administrative' - ORDER BY partition, admin_level""".format(self.rank) + ORDER BY partition, admin_level + """.format(self.rank) @staticmethod def sql_index_place(ids): return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\ .format(','.join((str(i) for i in ids))) + +class PostcodeRunner: + """ Provides the SQL commands for indexing the location_postcode table. + """ + + @staticmethod + def name(): + return "postcodes (location_postcode)" + + @staticmethod + def sql_count_objects(): + return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0' + + @staticmethod + def sql_get_objects(): + return """SELECT place_id FROM location_postcode + WHERE indexed_status > 0 + ORDER BY country_code, postcode""" + + @staticmethod + def sql_index_place(ids): + return """UPDATE location_postcode SET indexed_status = 0 + WHERE place_id IN ({}) + """.format(','.join((str(i) for i in ids))) + class Indexer: """ Main indexing routine. """ @@ -100,7 +127,36 @@ class Indexer: self.conn = psycopg2.connect(dsn) self.threads = [DBConnection(dsn) for _ in range(num_threads)] + + def index_full(self, analyse=True): + """ Index the complete database. This will first index boudnaries + followed by all other objects. When `analyse` is True, then the + database will be analysed at the appropriate places to + ensure that database statistics are updated. + """ + self.index_by_rank(0, 4) + self._analyse_db_if(analyse) + + self.index_boundaries(0, 30) + self._analyse_db_if(analyse) + + self.index_by_rank(5, 25) + self._analyse_db_if(analyse) + + self.index_by_rank(26, 30) + self._analyse_db_if(analyse) + + self.index_postcodes() + self._analyse_db_if(analyse) + + def _analyse_db_if(self, condition): + if condition: + with self.conn.cursor() as cur: + cur.execute('ANALYSE') + def index_boundaries(self, minrank, maxrank): + """ Index only administrative boundaries within the given rank range. + """ LOG.warning("Starting indexing boundaries using %s threads", len(self.threads)) @@ -108,7 +164,11 @@ class Indexer: self.index(BoundaryRunner(rank)) def index_by_rank(self, minrank, maxrank): - """ Run classic indexing by rank. + """ Index all entries of placex in the given rank range (inclusive) + in order of their address rank. + + When rank 30 is requested then also interpolations and + places with address rank 0 will be indexed. """ maxrank = min(maxrank, 30) LOG.warning("Starting indexing rank (%i to %i) using %i threads", @@ -124,6 +184,12 @@ class Indexer: else: self.index(RankRunner(maxrank)) + + def index_postcodes(self): + """Index the entries ofthe location_postcode table. + """ + self.index(PostcodeRunner(), 20) + def update_status_table(self): """ Update the status in the status table to 'indexed'. """ diff --git a/test/python/test_indexing.py b/test/python/test_indexing.py index 6b52a65e..ee9c6c7e 100644 --- a/test/python/test_indexing.py +++ b/test/python/test_indexing.py @@ -12,6 +12,7 @@ class IndexerTestDB: def __init__(self, conn): self.placex_id = itertools.count(100000) self.osmline_id = itertools.count(500000) + self.postcode_id = itertools.count(700000) self.conn = conn self.conn.set_isolation_level(0) @@ -31,6 +32,12 @@ class IndexerTestDB: indexed_status SMALLINT, indexed_date TIMESTAMP, geometry_sector INTEGER)""") + cur.execute("""CREATE TABLE location_postcode ( + place_id BIGINT, + indexed_status SMALLINT, + indexed_date TIMESTAMP, + country_code varchar(2), + postcode TEXT)""") cur.execute("""CREATE OR REPLACE FUNCTION date_update() RETURNS TRIGGER AS $$ BEGIN @@ -39,10 +46,10 @@ class IndexerTestDB: END IF; RETURN NEW; END; $$ LANGUAGE plpgsql;""") - cur.execute("""CREATE TRIGGER placex_update BEFORE UPDATE ON placex - FOR EACH ROW EXECUTE PROCEDURE date_update()""") - cur.execute("""CREATE TRIGGER osmline_update BEFORE UPDATE ON location_property_osmline - FOR EACH ROW EXECUTE PROCEDURE date_update()""") + for table in ('placex', 'location_property_osmline', 'location_postcode'): + cur.execute("""CREATE TRIGGER {0}_update BEFORE UPDATE ON {0} + FOR EACH ROW EXECUTE PROCEDURE date_update() + """.format(table)) def scalar(self, query): with self.conn.cursor() as cur: @@ -74,6 +81,15 @@ class IndexerTestDB: (next_id, sector)) return next_id + def add_postcode(self, country, postcode): + next_id = next(self.postcode_id) + with self.conn.cursor() as cur: + cur.execute("""INSERT INTO location_postcode + (place_id, indexed_status, country_code, postcode) + VALUES (%s, 1, %s, %s)""", + (next_id, country, postcode)) + return next_id + def placex_unindexed(self): return self.scalar('SELECT count(*) from placex where indexed_status > 0') @@ -87,7 +103,7 @@ def test_db(temp_db_conn): @pytest.mark.parametrize("threads", [1, 15]) -def test_index_full(test_db, threads): +def test_index_all_by_rank(test_db, threads): for rank in range(31): test_db.add_place(rank_address=rank, rank_search=rank) test_db.add_osmline() @@ -184,3 +200,35 @@ def test_index_boundaries(test_db, threads): assert 0 == test_db.scalar(""" SELECT count(*) FROM placex WHERE indexed_status = 0 AND class != 'boundary'""") + + +@pytest.mark.parametrize("threads", [1, 15]) +def test_index_postcodes(test_db, threads): + for postcode in range(1000): + test_db.add_postcode('de', postcode) + for postcode in range(32000, 33000): + test_db.add_postcode('us', postcode) + + idx = Indexer('dbname=test_nominatim_python_unittest', threads) + idx.index_postcodes() + + assert 0 == test_db.scalar("""SELECT count(*) FROM location_postcode + WHERE indexed_status != 0""") + + +def test_index_full(test_db): + for rank in range(4, 10): + test_db.add_admin(rank_address=rank, rank_search=rank) + for rank in range(31): + test_db.add_place(rank_address=rank, rank_search=rank) + test_db.add_osmline() + for postcode in range(1000): + test_db.add_postcode('de', postcode) + + idx = Indexer('dbname=test_nominatim_python_unittest', 4) + idx.index_full() + + assert 0 == test_db.placex_unindexed() + assert 0 == test_db.osmline_unindexed() + assert 0 == test_db.scalar("""SELECT count(*) FROM location_postcode + WHERE indexed_status != 0""") diff --git a/test/python/test_tools_refresh.py b/test/python/test_tools_refresh.py new file mode 100644 index 00000000..d6c46ad7 --- /dev/null +++ b/test/python/test_tools_refresh.py @@ -0,0 +1,26 @@ +""" +Test for various refresh functions. +""" +from pathlib import Path + +import pytest + +from nominatim.tools import refresh + +TEST_DIR = (Path(__file__) / '..' / '..').resolve() + +def test_refresh_import_wikipedia_not_existing(dsn): + assert 1 == refresh.import_wikipedia_articles(dsn, Path('.')) + + +@pytest.mark.parametrize("replace", (True, False)) +def test_refresh_import_wikipedia(dsn, table_factory, temp_db_cursor, replace): + if replace: + table_factory('wikipedia_article') + table_factory('wikipedia_redirect') + + # use the small wikipedia file for the API testdb + assert 0 == refresh.import_wikipedia_articles(dsn, TEST_DIR / 'testdb') + + assert temp_db_cursor.scalar('SELECT count(*) FROM wikipedia_article') > 0 + assert temp_db_cursor.scalar('SELECT count(*) FROM wikipedia_redirect') > 0 -- 2.39.5