--- /dev/null
+"""
+Main work horse for indexing (computing addresses) the database.
+"""
+# pylint: disable=C0111
+import logging
+import select
+
+from .progress import ProgressLogger
+from db.async_connection import DBConnection, make_connection
+
+LOG = logging.getLogger()
+
+class RankRunner:
+ """ Returns SQL commands for indexing one rank within the placex table.
+ """
+
+ def __init__(self, rank):
+ self.rank = rank
+
+ def name(self):
+ return "rank {}".format(self.rank)
+
+ def sql_count_objects(self):
+ return """SELECT count(*) FROM placex
+ WHERE rank_address = {} and indexed_status > 0
+ """.format(self.rank)
+
+ def sql_get_objects(self):
+ return """SELECT place_id FROM placex
+ WHERE indexed_status > 0 and rank_address = {}
+ ORDER BY geometry_sector""".format(self.rank)
+
+ @staticmethod
+ def sql_index_place(ids):
+ return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
+ .format(','.join((str(i) for i in ids)))
+
+
+class InterpolationRunner:
+ """ Returns SQL commands for indexing the address interpolation table
+ location_property_osmline.
+ """
+
+ @staticmethod
+ def name():
+ return "interpolation lines (location_property_osmline)"
+
+ @staticmethod
+ def sql_count_objects():
+ return """SELECT count(*) FROM location_property_osmline
+ WHERE indexed_status > 0"""
+
+ @staticmethod
+ def sql_get_objects():
+ return """SELECT place_id FROM location_property_osmline
+ WHERE indexed_status > 0
+ ORDER BY geometry_sector"""
+
+ @staticmethod
+ def sql_index_place(ids):
+ return """UPDATE location_property_osmline
+ SET indexed_status = 0 WHERE place_id IN ({})"""\
+ .format(','.join((str(i) for i in ids)))
+
+class BoundaryRunner:
+ """ Returns SQL commands for indexing the administrative boundaries
+ of a certain rank.
+ """
+
+ def __init__(self, rank):
+ self.rank = rank
+
+ def name(self):
+ return "boundaries rank {}".format(self.rank)
+
+ def sql_count_objects(self):
+ return """SELECT count(*) FROM placex
+ WHERE indexed_status > 0
+ AND rank_search = {}
+ AND class = 'boundary' and type = 'administrative'""".format(self.rank)
+
+ def sql_get_objects(self):
+ return """SELECT place_id FROM placex
+ WHERE indexed_status > 0 and rank_search = {}
+ and class = 'boundary' and type = 'administrative'
+ ORDER BY partition, admin_level""".format(self.rank)
+
+ @staticmethod
+ def sql_index_place(ids):
+ return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
+ .format(','.join((str(i) for i in ids)))
+
+class Indexer:
+ """ Main indexing routine.
+ """
+
+ def __init__(self, opts):
+ self.minrank = max(1, opts.minrank)
+ self.maxrank = min(30, opts.maxrank)
+ self.conn = make_connection(opts)
+ self.threads = [DBConnection(opts) for _ in range(opts.threads)]
+
+ def index_boundaries(self):
+ LOG.warning("Starting indexing boundaries using %s threads",
+ len(self.threads))
+
+ for rank in range(max(self.minrank, 5), min(self.maxrank, 26)):
+ self.index(BoundaryRunner(rank))
+
+ def index_by_rank(self):
+ """ Run classic indexing by rank.
+ """
+ LOG.warning("Starting indexing rank (%i to %i) using %i threads",
+ self.minrank, self.maxrank, len(self.threads))
+
+ for rank in range(max(1, self.minrank), self.maxrank):
+ self.index(RankRunner(rank))
+
+ if self.maxrank == 30:
+ self.index(RankRunner(0))
+ self.index(InterpolationRunner(), 20)
+ self.index(RankRunner(self.maxrank), 20)
+ else:
+ self.index(RankRunner(self.maxrank))
+
+ def index(self, obj, batch=1):
+ """ Index a single rank or table. `obj` describes the SQL to use
+ for indexing. `batch` describes the number of objects that
+ should be processed with a single SQL statement
+ """
+ LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
+
+ cur = self.conn.cursor()
+ cur.execute(obj.sql_count_objects())
+
+ total_tuples = cur.fetchone()[0]
+ LOG.debug("Total number of rows: %i", total_tuples)
+
+ cur.close()
+
+ progress = ProgressLogger(obj.name(), total_tuples)
+
+ if total_tuples > 0:
+ cur = self.conn.cursor(name='places')
+ cur.execute(obj.sql_get_objects())
+
+ next_thread = self.find_free_thread()
+ while True:
+ places = [p[0] for p in cur.fetchmany(batch)]
+ if not places:
+ break
+
+ LOG.debug("Processing places: %s", str(places))
+ thread = next(next_thread)
+
+ thread.perform(obj.sql_index_place(places))
+ progress.add(len(places))
+
+ cur.close()
+
+ for thread in self.threads:
+ thread.wait()
+
+ progress.done()
+
+ def find_free_thread(self):
+ """ Generator that returns the next connection that is free for
+ sending a query.
+ """
+ ready = self.threads
+ command_stat = 0
+
+ while True:
+ for thread in ready:
+ if thread.is_done():
+ command_stat += 1
+ yield thread
+
+ # refresh the connections occasionaly to avoid potential
+ # memory leaks in Postgresql.
+ if command_stat > 100000:
+ for thread in self.threads:
+ while not thread.is_done():
+ thread.wait()
+ thread.connect()
+ command_stat = 0
+ ready = self.threads
+ else:
+ ready, _, _ = select.select(self.threads, [], [])
+
+ assert False, "Unreachable code"
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#-----------------------------------------------------------------------------
-# pylint: disable=C0111
from argparse import ArgumentParser, RawDescriptionHelpFormatter
import logging
import sys
import getpass
-import select
-
-from indexer.progress import ProgressLogger # pylint: disable=E0401
-from indexer.db import DBConnection, make_connection # pylint: disable=E0401
-
-LOG = logging.getLogger()
-
-class RankRunner:
- """ Returns SQL commands for indexing one rank within the placex table.
- """
-
- def __init__(self, rank):
- self.rank = rank
-
- def name(self):
- return "rank {}".format(self.rank)
-
- def sql_count_objects(self):
- return """SELECT count(*) FROM placex
- WHERE rank_address = {} and indexed_status > 0
- """.format(self.rank)
-
- def sql_get_objects(self):
- return """SELECT place_id FROM placex
- WHERE indexed_status > 0 and rank_address = {}
- ORDER BY geometry_sector""".format(self.rank)
-
- @staticmethod
- def sql_index_place(ids):
- return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
- .format(','.join((str(i) for i in ids)))
-
-
-class InterpolationRunner:
- """ Returns SQL commands for indexing the address interpolation table
- location_property_osmline.
- """
-
- @staticmethod
- def name():
- return "interpolation lines (location_property_osmline)"
-
- @staticmethod
- def sql_count_objects():
- return """SELECT count(*) FROM location_property_osmline
- WHERE indexed_status > 0"""
-
- @staticmethod
- def sql_get_objects():
- return """SELECT place_id FROM location_property_osmline
- WHERE indexed_status > 0
- ORDER BY geometry_sector"""
-
- @staticmethod
- def sql_index_place(ids):
- return """UPDATE location_property_osmline
- SET indexed_status = 0 WHERE place_id IN ({})"""\
- .format(','.join((str(i) for i in ids)))
-
-class BoundaryRunner:
- """ Returns SQL commands for indexing the administrative boundaries
- of a certain rank.
- """
-
- def __init__(self, rank):
- self.rank = rank
-
- def name(self):
- return "boundaries rank {}".format(self.rank)
-
- def sql_count_objects(self):
- return """SELECT count(*) FROM placex
- WHERE indexed_status > 0
- AND rank_search = {}
- AND class = 'boundary' and type = 'administrative'""".format(self.rank)
-
- def sql_get_objects(self):
- return """SELECT place_id FROM placex
- WHERE indexed_status > 0 and rank_search = {}
- and class = 'boundary' and type = 'administrative'
- ORDER BY partition, admin_level""".format(self.rank)
-
- @staticmethod
- def sql_index_place(ids):
- return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
- .format(','.join((str(i) for i in ids)))
-
-class Indexer:
- """ Main indexing routine.
- """
-
- def __init__(self, opts):
- self.minrank = max(1, opts.minrank)
- self.maxrank = min(30, opts.maxrank)
- self.conn = make_connection(opts)
- self.threads = [DBConnection(opts) for _ in range(opts.threads)]
-
- def index_boundaries(self):
- LOG.warning("Starting indexing boundaries using %s threads",
- len(self.threads))
-
- for rank in range(max(self.minrank, 5), min(self.maxrank, 26)):
- self.index(BoundaryRunner(rank))
-
- def index_by_rank(self):
- """ Run classic indexing by rank.
- """
- LOG.warning("Starting indexing rank (%i to %i) using %i threads",
- self.minrank, self.maxrank, len(self.threads))
-
- for rank in range(max(1, self.minrank), self.maxrank):
- self.index(RankRunner(rank))
-
- if self.maxrank == 30:
- self.index(RankRunner(0))
- self.index(InterpolationRunner(), 20)
- self.index(RankRunner(self.maxrank), 20)
- else:
- self.index(RankRunner(self.maxrank))
-
- def index(self, obj, batch=1):
- """ Index a single rank or table. `obj` describes the SQL to use
- for indexing. `batch` describes the number of objects that
- should be processed with a single SQL statement
- """
- LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
-
- cur = self.conn.cursor()
- cur.execute(obj.sql_count_objects())
-
- total_tuples = cur.fetchone()[0]
- LOG.debug("Total number of rows: %i", total_tuples)
-
- cur.close()
-
- progress = ProgressLogger(obj.name(), total_tuples)
-
- if total_tuples > 0:
- cur = self.conn.cursor(name='places')
- cur.execute(obj.sql_get_objects())
-
- next_thread = self.find_free_thread()
- while True:
- places = [p[0] for p in cur.fetchmany(batch)]
- if not places:
- break
-
- LOG.debug("Processing places: %s", str(places))
- thread = next(next_thread)
-
- thread.perform(obj.sql_index_place(places))
- progress.add(len(places))
-
- cur.close()
-
- for thread in self.threads:
- thread.wait()
-
- progress.done()
-
- def find_free_thread(self):
- """ Generator that returns the next connection that is free for
- sending a query.
- """
- ready = self.threads
- command_stat = 0
-
- while True:
- for thread in ready:
- if thread.is_done():
- command_stat += 1
- yield thread
-
- # refresh the connections occasionaly to avoid potential
- # memory leaks in Postgresql.
- if command_stat > 100000:
- for thread in self.threads:
- while not thread.is_done():
- thread.wait()
- thread.connect()
- command_stat = 0
- ready = self.threads
- else:
- ready, _, _ = select.select(self.threads, [], [])
-
- assert False, "Unreachable code"
+from indexer.indexer import Indexer
def nominatim_arg_parser():
""" Setup the command-line parser for the tool.
return parser
if __name__ == '__main__':
- logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s')
-
OPTIONS = nominatim_arg_parser().parse_args(sys.argv[1:])
- LOG.setLevel(max(3 - OPTIONS.loglevel, 0) * 10)
+ logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s',
+ level=max(3 - OPTIONS.loglevel, 0) * 10)
OPTIONS.password = None
if OPTIONS.password_prompt: