From 4a9502bf880d8459088e93c5da39770405dde699 Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Mon, 20 Jan 2020 00:05:28 +0100 Subject: [PATCH] fix SQL and some other stuff --- nominatim/nominatim.py | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/nominatim/nominatim.py b/nominatim/nominatim.py index 6b25cf5c..f525b744 100644 --- a/nominatim/nominatim.py +++ b/nominatim/nominatim.py @@ -96,18 +96,16 @@ class Indexer(object): self.conn = make_connection(options) self.threads = [] - self.poll = select.poll() for i in range(options.threads): t = IndexingThread(i, options) self.threads.append(t) - self.poll.register(t, select.EPOLLIN) def run(self): log.info("Starting indexing rank ({} to {}) using {} threads".format( self.options.minrank, self.options.maxrank, self.options.threads)) - for rank in range(self.options.minrank, 30): + for rank in range(self.options.minrank, min(self.options.maxrank, 30)): self.index(RankRunner(rank)) if self.options.maxrank >= 30: @@ -117,7 +115,7 @@ class Indexer(object): def index(self, obj): log.info("Starting {}".format(obj.name())) - cur = self.conn.cursor(name="main") + cur = self.conn.cursor(name='main') cur.execute(obj.sql_index_sectors()) total_tuples = 0 @@ -130,25 +128,29 @@ class Indexer(object): next_thread = self.find_free_thread() done_tuples = 0 rank_start_time = datetime.now() + + sector_sql = obj.sql_sector_places() + index_sql = obj.sql_index_place() + min_grouped_tuples = total_tuples - len(self.threads) * 1000 for r in cur: sector = r[0] # Should we do the remaining ones together? - do_all = total_tuples - done_tuples < len(self.threads) * 1000 + do_all = done_tuples > min_grouped_tuples pcur = self.conn.cursor(name='places') if do_all: pcur.execute(obj.sql_nosector_places()) else: - pcur.execute(obj.sql_sector_places(), (sector, )) + pcur.execute(sector_sql, (sector, )) for place in pcur: place_id = place[0] log.debug("Processing place {}".format(place_id)) thread = next(next_thread) - thread.perform(obj.sql_index_place(), (place_id,)) + thread.perform(index_sql, (place_id,)) done_tuples += 1 pcur.close() @@ -164,24 +166,19 @@ class Indexer(object): rank_end_time = datetime.now() diff_seconds = (rank_end_time-rank_start_time).total_seconds() - log.info("Done {} in {} @ {} per second - FINISHED {}\n".format( - done_tuples, int(diff_seconds), + log.info("Done {}/{} in {} @ {} per second - FINISHED {}\n".format( + done_tuples, total_tuples, int(diff_seconds), done_tuples/diff_seconds, obj.name())) def find_free_thread(self): - thread_lookup = { t.fileno() : t for t in self.threads} - - done_fids = [ t.fileno() for t in self.threads ] + ready = self.threads while True: - for fid in done_fids: - thread = thread_lookup[fid] + for thread in ready: if thread.is_done(): yield thread - else: - print("not good", fid) - done_fids = [ x[0] for x in self.poll.poll()] + ready, _, _ = select.select(self.threads, [], []) assert(False, "Unreachable code") @@ -212,8 +209,8 @@ class RankRunner(object): def sql_sector_places(self): return """SELECT place_id FROM placex - WHERE indexed_status > 0 and geometry_sector = %s - ORDER BY geometry_sector""" + WHERE indexed_status > 0 and rank_search = {} + and geometry_sector = %s""".format(self.rank) def sql_index_place(self): return "EXECUTE rnk_index(%s)" -- 2.39.5