Also moves postcodes to fully parallel indexing.
$oNominatimCmd->addParams('--verbose');
}
+// by default, use all but one processor, but never more than 15.
+var_dump($aCMDResult);
+$iInstances = max(1, $aCMDResult['threads'] ?? (min(16, getProcessorCount()) - 1));
+
+function run($oCmd) {
+ global $iInstances;
+ $oCmd->addParams('--threads', $iInstances);
+ $oCmd->run(true);
+}
+
//*******************************************************
// Making some sanity check:
// go through complete process if 'all' is selected or start selected functions
if ($aCMDResult['create-db'] || $aCMDResult['all']) {
$bDidSomething = true;
- (clone($oNominatimCmd))->addParams('transition', '--create-db')->run(true);
+ run((clone($oNominatimCmd))->addParams('transition', '--create-db'));
}
if ($aCMDResult['setup-db'] || $aCMDResult['all']) {
$oCmd->addParams('--no-partitions');
}
- $oCmd->run(true);
+ run($oCmd);
}
if ($aCMDResult['import-data'] || $aCMDResult['all']) {
$oCmd->addParams('--drop');
}
- $oCmd->run(true);
+ run($oCmd);
}
if ($aCMDResult['create-functions'] || $aCMDResult['all']) {
if ($aCMDResult['import-wikipedia-articles'] || $aCMDResult['all']) {
$bDidSomething = true;
+ // ignore errors!
(clone($oNominatimCmd))->addParams('refresh', '--wiki-data')->run();
}
if ($aCMDResult['index'] || $aCMDResult['all']) {
$bDidSomething = true;
- $oSetup->index($aCMDResult['index-noanalyse']);
+ $oCmd = (clone($oNominatimCmd))->addParams('transition', '--index');
+ if ($aCMDResult['index-noanalyse'] ?? false) {
+ $oCmd->addParams('--no-analyse');
+ }
+
+ run($oCmd);
}
if ($aCMDResult['drop']) {
$bDidSomething = true;
- (clone($oNominatimCmd))->addParams('freeze')->run(true);
+ run((clone($oNominatimCmd))->addParams('freeze'));
}
if ($aCMDResult['create-search-indices'] || $aCMDResult['all']) {
if ($aCMDResult['setup-website'] || $aCMDResult['all']) {
$bDidSomething = true;
- (clone($oNominatimCmd))->addParams('refresh', '--website')->run(true);
+ run((clone($oNominatimCmd))->addParams('refresh', '--website'));
}
// ******************************************************
if ($this->bVerbose) {
$this->oNominatimCmd->addParams('--verbose');
}
- $this->oNominatimCmd->addParams('--threads', $this->iInstances);
}
public function createFunctions()
$this->db()->exec($sSQL);
}
- public function index($bIndexNoanalyse)
- {
- $this->checkModulePresence(); // raises exception on failure
-
- $oBaseCmd = (clone $this->oNominatimCmd)->addParams('index');
-
- info('Index ranks 0 - 4');
- $oCmd = (clone $oBaseCmd)->addParams('--maxrank', 4);
-
- $iStatus = $oCmd->run();
- if ($iStatus != 0) {
- fail('error status ' . $iStatus . ' running nominatim!');
- }
- if (!$bIndexNoanalyse) $this->pgsqlRunScript('ANALYSE');
-
- info('Index administrative boundaries');
- $oCmd = (clone $oBaseCmd)->addParams('--boundaries-only');
- $iStatus = $oCmd->run();
- if ($iStatus != 0) {
- fail('error status ' . $iStatus . ' running nominatim!');
- }
-
- info('Index ranks 5 - 25');
- $oCmd = (clone $oBaseCmd)->addParams('--no-boundaries', '--minrank', 5, '--maxrank', 25);
- $iStatus = $oCmd->run();
- if ($iStatus != 0) {
- fail('error status ' . $iStatus . ' running nominatim!');
- }
-
- if (!$bIndexNoanalyse) $this->pgsqlRunScript('ANALYSE');
-
- info('Index ranks 26 - 30');
- $oCmd = (clone $oBaseCmd)->addParams('--no-boundaries', '--minrank', 26);
- $iStatus = $oCmd->run();
- if ($iStatus != 0) {
- fail('error status ' . $iStatus . ' running nominatim!');
- }
-
- info('Index postcodes');
- $sSQL = 'UPDATE location_postcode SET indexed_status = 0';
- $this->db()->exec($sSQL);
- }
-
public function createSearchIndices()
{
info('Create Search indices');
params.append('--ignore-errors')
if args.index_noanalyse:
params.append('--index-noanalyse')
+ if args.threads:
+ params.extend(('--threads', args.threads))
return run_legacy_script(*params, nominatim_env=args)
help='Build a blank nominatim db')
group.add_argument('--import-data', action='store_true',
help='Import a osm file')
+ group.add_argument('--index', action='store_true',
+ help='Index the data')
group = parser.add_argument_group('Options')
group.add_argument('--no-partitions', action='store_true',
help='Do not partition search indices')
help='Drop tables needed for updates, making the database readonly')
group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int,
help='Size of cache to be used by osm2pgsql (in MB)')
+ group.add_argument('--no-analyse', action='store_true',
+ help='Do not perform analyse operations during index')
@staticmethod
def run(args):
database_import.import_osm_data(Path(args.osm_file),
args.osm2pgsql_options(0, 1),
drop=args.drop)
+
+ if args.index:
+ LOG.warning('Indexing')
+ from ..indexer.indexer import Indexer
+ indexer = Indexer(args.config.get_libpq_dsn(), args.threads or 1)
+ indexer.index_full()
@staticmethod
def sql_index_place(ids):
return """UPDATE location_property_osmline
- SET indexed_status = 0 WHERE place_id IN ({})"""\
- .format(','.join((str(i) for i in ids)))
+ SET indexed_status = 0 WHERE place_id IN ({})
+ """.format(','.join((str(i) for i in ids)))
class BoundaryRunner:
""" Returns SQL commands for indexing the administrative boundaries
return """SELECT count(*) FROM placex
WHERE indexed_status > 0
AND rank_search = {}
- AND class = 'boundary' and type = 'administrative'""".format(self.rank)
+ AND class = 'boundary' and type = 'administrative'
+ """.format(self.rank)
def sql_get_objects(self):
return """SELECT place_id FROM placex
WHERE indexed_status > 0 and rank_search = {}
and class = 'boundary' and type = 'administrative'
- ORDER BY partition, admin_level""".format(self.rank)
+ ORDER BY partition, admin_level
+ """.format(self.rank)
@staticmethod
def sql_index_place(ids):
return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
.format(','.join((str(i) for i in ids)))
+
+class PostcodeRunner:
+ """ Provides the SQL commands for indexing the location_postcode table.
+ """
+
+ @staticmethod
+ def name():
+ return "postcodes (location_postcode)"
+
+ @staticmethod
+ def sql_count_objects():
+ return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
+
+ @staticmethod
+ def sql_get_objects():
+ return """SELECT place_id FROM location_postcode
+ WHERE indexed_status > 0
+ ORDER BY country_code, postcode"""
+
+ @staticmethod
+ def sql_index_place(ids):
+ return """UPDATE location_postcode SET indexed_status = 0
+ WHERE place_id IN ({})
+ """.format(','.join((str(i) for i in ids)))
+
class Indexer:
""" Main indexing routine.
"""
self.conn = psycopg2.connect(dsn)
self.threads = [DBConnection(dsn) for _ in range(num_threads)]
+
+ def index_full(self, analyse=True):
+ """ Index the complete database. This will first index boudnaries
+ followed by all other objects. When `analyse` is True, then the
+ database will be analysed at the appropriate places to
+ ensure that database statistics are updated.
+ """
+ self.index_by_rank(0, 4)
+ self._analyse_db_if(analyse)
+
+ self.index_boundaries(0, 30)
+ self._analyse_db_if(analyse)
+
+ self.index_by_rank(5, 25)
+ self._analyse_db_if(analyse)
+
+ self.index_by_rank(26, 30)
+ self._analyse_db_if(analyse)
+
+ self.index_postcodes()
+ self._analyse_db_if(analyse)
+
+ def _analyse_db_if(self, condition):
+ if condition:
+ with self.conn.cursor() as cur:
+ cur.execute('ANALYSE')
+
def index_boundaries(self, minrank, maxrank):
+ """ Index only administrative boundaries within the given rank range.
+ """
LOG.warning("Starting indexing boundaries using %s threads",
len(self.threads))
self.index(BoundaryRunner(rank))
def index_by_rank(self, minrank, maxrank):
- """ Run classic indexing by rank.
+ """ Index all entries of placex in the given rank range (inclusive)
+ in order of their address rank.
+
+ When rank 30 is requested then also interpolations and
+ places with address rank 0 will be indexed.
"""
maxrank = min(maxrank, 30)
LOG.warning("Starting indexing rank (%i to %i) using %i threads",
else:
self.index(RankRunner(maxrank))
+
+ def index_postcodes(self):
+ """Index the entries ofthe location_postcode table.
+ """
+ self.index(PostcodeRunner(), 20)
+
def update_status_table(self):
""" Update the status in the status table to 'indexed'.
"""
def __init__(self, conn):
self.placex_id = itertools.count(100000)
self.osmline_id = itertools.count(500000)
+ self.postcode_id = itertools.count(700000)
self.conn = conn
self.conn.set_isolation_level(0)
indexed_status SMALLINT,
indexed_date TIMESTAMP,
geometry_sector INTEGER)""")
+ cur.execute("""CREATE TABLE location_postcode (
+ place_id BIGINT,
+ indexed_status SMALLINT,
+ indexed_date TIMESTAMP,
+ country_code varchar(2),
+ postcode TEXT)""")
cur.execute("""CREATE OR REPLACE FUNCTION date_update() RETURNS TRIGGER
AS $$
BEGIN
END IF;
RETURN NEW;
END; $$ LANGUAGE plpgsql;""")
- cur.execute("""CREATE TRIGGER placex_update BEFORE UPDATE ON placex
- FOR EACH ROW EXECUTE PROCEDURE date_update()""")
- cur.execute("""CREATE TRIGGER osmline_update BEFORE UPDATE ON location_property_osmline
- FOR EACH ROW EXECUTE PROCEDURE date_update()""")
+ for table in ('placex', 'location_property_osmline', 'location_postcode'):
+ cur.execute("""CREATE TRIGGER {0}_update BEFORE UPDATE ON {0}
+ FOR EACH ROW EXECUTE PROCEDURE date_update()
+ """.format(table))
def scalar(self, query):
with self.conn.cursor() as cur:
(next_id, sector))
return next_id
+ def add_postcode(self, country, postcode):
+ next_id = next(self.postcode_id)
+ with self.conn.cursor() as cur:
+ cur.execute("""INSERT INTO location_postcode
+ (place_id, indexed_status, country_code, postcode)
+ VALUES (%s, 1, %s, %s)""",
+ (next_id, country, postcode))
+ return next_id
+
def placex_unindexed(self):
return self.scalar('SELECT count(*) from placex where indexed_status > 0')
@pytest.mark.parametrize("threads", [1, 15])
-def test_index_full(test_db, threads):
+def test_index_all_by_rank(test_db, threads):
for rank in range(31):
test_db.add_place(rank_address=rank, rank_search=rank)
test_db.add_osmline()
assert 0 == test_db.scalar("""
SELECT count(*) FROM placex
WHERE indexed_status = 0 AND class != 'boundary'""")
+
+
+@pytest.mark.parametrize("threads", [1, 15])
+def test_index_postcodes(test_db, threads):
+ for postcode in range(1000):
+ test_db.add_postcode('de', postcode)
+ for postcode in range(32000, 33000):
+ test_db.add_postcode('us', postcode)
+
+ idx = Indexer('dbname=test_nominatim_python_unittest', threads)
+ idx.index_postcodes()
+
+ assert 0 == test_db.scalar("""SELECT count(*) FROM location_postcode
+ WHERE indexed_status != 0""")
+
+
+def test_index_full(test_db):
+ for rank in range(4, 10):
+ test_db.add_admin(rank_address=rank, rank_search=rank)
+ for rank in range(31):
+ test_db.add_place(rank_address=rank, rank_search=rank)
+ test_db.add_osmline()
+ for postcode in range(1000):
+ test_db.add_postcode('de', postcode)
+
+ idx = Indexer('dbname=test_nominatim_python_unittest', 4)
+ idx.index_full()
+
+ assert 0 == test_db.placex_unindexed()
+ assert 0 == test_db.osmline_unindexed()
+ assert 0 == test_db.scalar("""SELECT count(*) FROM location_postcode
+ WHERE indexed_status != 0""")
--- /dev/null
+"""
+Test for various refresh functions.
+"""
+from pathlib import Path
+
+import pytest
+
+from nominatim.tools import refresh
+
+TEST_DIR = (Path(__file__) / '..' / '..').resolve()
+
+def test_refresh_import_wikipedia_not_existing(dsn):
+ assert 1 == refresh.import_wikipedia_articles(dsn, Path('.'))
+
+
+@pytest.mark.parametrize("replace", (True, False))
+def test_refresh_import_wikipedia(dsn, table_factory, temp_db_cursor, replace):
+ if replace:
+ table_factory('wikipedia_article')
+ table_factory('wikipedia_redirect')
+
+ # use the small wikipedia file for the API testdb
+ assert 0 == refresh.import_wikipedia_articles(dsn, TEST_DIR / 'testdb')
+
+ assert temp_db_cursor.scalar('SELECT count(*) FROM wikipedia_article') > 0
+ assert temp_db_cursor.scalar('SELECT count(*) FROM wikipedia_redirect') > 0