]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/nominatim.py
Merge pull request #1813 from lonvia/revert-concurrent-indexing
[nominatim.git] / nominatim / nominatim.py
1 #! /usr/bin/env python3
2 #-----------------------------------------------------------------------------
3 # nominatim - [description]
4 #-----------------------------------------------------------------------------
5 #
6 # Indexing tool for the Nominatim database.
7 #
8 # Based on C version by Brian Quinion
9 #
10 # This program is free software; you can redistribute it and/or
11 # modify it under the terms of the GNU General Public License
12 # as published by the Free Software Foundation; either version 2
13 # of the License, or (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this program; if not, write to the Free Software
22 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
23 #-----------------------------------------------------------------------------
24
25 from argparse import ArgumentParser, RawDescriptionHelpFormatter, ArgumentTypeError
26 import logging
27 import sys
28 import re
29 import getpass
30 from datetime import datetime
31 import psycopg2
32 from psycopg2.extras import wait_select
33 import select
34
35 log = logging.getLogger()
36
37 def make_connection(options, asynchronous=False):
38     params = {'dbname' : options.dbname,
39               'user' : options.user,
40               'password' : options.password,
41               'host' : options.host,
42               'port' : options.port,
43               'async' : asynchronous}
44
45     return psycopg2.connect(**params)
46
47
48 class RankRunner(object):
49     """ Returns SQL commands for indexing one rank within the placex table.
50     """
51
52     def __init__(self, rank):
53         self.rank = rank
54
55     def name(self):
56         return "rank {}".format(self.rank)
57
58     def sql_index_sectors(self):
59         return """SELECT geometry_sector, count(*) FROM placex
60                   WHERE rank_search = {} and indexed_status > 0
61                   GROUP BY geometry_sector
62                   ORDER BY geometry_sector""".format(self.rank)
63
64     def sql_nosector_places(self):
65         return """SELECT place_id FROM placex
66                   WHERE indexed_status > 0 and rank_search = {}
67                   ORDER BY geometry_sector""".format(self.rank)
68
69     def sql_sector_places(self):
70         return """SELECT place_id FROM placex
71                   WHERE indexed_status > 0 and rank_search = {}
72                         and geometry_sector = %s""".format(self.rank)
73
74     def sql_index_place(self):
75         return "UPDATE placex SET indexed_status = 0 WHERE place_id = %s"
76
77
78 class InterpolationRunner(object):
79     """ Returns SQL commands for indexing the address interpolation table
80         location_property_osmline.
81     """
82
83     def name(self):
84         return "interpolation lines (location_property_osmline)"
85
86     def sql_index_sectors(self):
87         return """SELECT geometry_sector, count(*) FROM location_property_osmline
88                   WHERE indexed_status > 0
89                   GROUP BY geometry_sector
90                   ORDER BY geometry_sector"""
91
92     def sql_nosector_places(self):
93         return """SELECT place_id FROM location_property_osmline
94                   WHERE indexed_status > 0
95                   ORDER BY geometry_sector"""
96
97     def sql_sector_places(self):
98         return """SELECT place_id FROM location_property_osmline
99                   WHERE indexed_status > 0 and geometry_sector = %s
100                   ORDER BY geometry_sector"""
101
102     def sql_index_place(self):
103         return """UPDATE location_property_osmline
104                   SET indexed_status = 0 WHERE place_id = %s"""
105
106
107 class DBConnection(object):
108     """ A single non-blocking database connection.
109     """
110
111     def __init__(self, options):
112         self.current_query = None
113         self.current_params = None
114
115         self.conn = None
116         self.connect()
117
118     def connect(self):
119         if self.conn is not None:
120             self.cursor.close()
121             self.conn.close()
122
123         self.conn = make_connection(options, asynchronous=True)
124         self.wait()
125
126         self.cursor = self.conn.cursor()
127         # Disable JIT and parallel workers as they are known to cause problems.
128         # Update pg_settings instead of using SET because it does not yield
129         # errors on older versions of Postgres where the settings are not
130         # implemented.
131         self.perform(
132             """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost';
133                 UPDATE pg_settings SET setting = 0 
134                    WHERE name = 'max_parallel_workers_per_gather';""")
135         self.wait()
136
137     def wait(self):
138         """ Block until any pending operation is done.
139         """
140         while True:
141             try:
142                 wait_select(self.conn)
143                 self.current_query = None
144                 return
145             except psycopg2.extensions.TransactionRollbackError as e:
146                 if e.pgcode == '40P01':
147                     log.info("Deadlock detected (params = {}), retry."
148                               .format(self.current_params))
149                     self.cursor.execute(self.current_query, self.current_params)
150                 else:
151                     raise
152             except psycopg2.errors.DeadlockDetected:
153                 self.cursor.execute(self.current_query, self.current_params)
154
155     def perform(self, sql, args=None):
156         """ Send SQL query to the server. Returns immediately without
157             blocking.
158         """
159         self.current_query = sql
160         self.current_params = args
161         self.cursor.execute(sql, args)
162
163     def fileno(self):
164         """ File descriptor to wait for. (Makes this class select()able.)
165         """
166         return self.conn.fileno()
167
168     def is_done(self):
169         """ Check if the connection is available for a new query.
170
171             Also checks if the previous query has run into a deadlock.
172             If so, then the previous query is repeated.
173         """
174         if self.current_query is None:
175             return True
176
177         try:
178             if self.conn.poll() == psycopg2.extensions.POLL_OK:
179                 self.current_query = None
180                 return True
181         except psycopg2.extensions.TransactionRollbackError as e:
182             if e.pgcode == '40P01':
183                 log.info("Deadlock detected (params = {}), retry.".format(self.current_params))
184                 self.cursor.execute(self.current_query, self.current_params)
185             else:
186                 raise
187         except psycopg2.errors.DeadlockDetected:
188             self.cursor.execute(self.current_query, self.current_params)
189
190         return False
191
192
193 class Indexer(object):
194     """ Main indexing routine.
195     """
196
197     def __init__(self, options):
198         self.minrank = max(0, options.minrank)
199         self.maxrank = min(30, options.maxrank)
200         self.conn = make_connection(options)
201         self.threads = [DBConnection(options) for i in range(options.threads)]
202
203     def run(self):
204         """ Run indexing over the entire database.
205         """
206         log.warning("Starting indexing rank ({} to {}) using {} threads".format(
207                  self.minrank, self.maxrank, len(self.threads)))
208
209         for rank in range(self.minrank, self.maxrank):
210             self.index(RankRunner(rank))
211
212         if self.maxrank == 30:
213             self.index(InterpolationRunner())
214
215         self.index(RankRunner(self.maxrank))
216
217     def index(self, obj):
218         """ Index a single rank or table. `obj` describes the SQL to use
219             for indexing.
220         """
221         log.warning("Starting {}".format(obj.name()))
222
223         cur = self.conn.cursor(name='main')
224         cur.execute(obj.sql_index_sectors())
225
226         total_tuples = 0
227         for r in cur:
228             total_tuples += r[1]
229         log.debug("Total number of rows; {}".format(total_tuples))
230
231         cur.scroll(0, mode='absolute')
232
233         next_thread = self.find_free_thread()
234         done_tuples = 0
235         rank_start_time = datetime.now()
236
237         sector_sql = obj.sql_sector_places()
238         index_sql = obj.sql_index_place()
239         min_grouped_tuples = total_tuples - len(self.threads) * 1000
240
241         next_info = 100 if log.isEnabledFor(logging.INFO) else total_tuples + 1
242
243         for r in cur:
244             sector = r[0]
245
246             # Should we do the remaining ones together?
247             do_all = done_tuples > min_grouped_tuples
248
249             pcur = self.conn.cursor(name='places')
250
251             if do_all:
252                 pcur.execute(obj.sql_nosector_places())
253             else:
254                 pcur.execute(sector_sql, (sector, ))
255
256             for place in pcur:
257                 place_id = place[0]
258                 log.debug("Processing place {}".format(place_id))
259                 thread = next(next_thread)
260
261                 thread.perform(index_sql, (place_id,))
262                 done_tuples += 1
263
264                 if done_tuples >= next_info:
265                     now = datetime.now()
266                     done_time = (now - rank_start_time).total_seconds()
267                     tuples_per_sec = done_tuples / done_time
268                     log.info("Done {} in {} @ {:.3f} per second - {} ETA (seconds): {:.2f}"
269                            .format(done_tuples, int(done_time),
270                                    tuples_per_sec, obj.name(),
271                                    (total_tuples - done_tuples)/tuples_per_sec))
272                     next_info += int(tuples_per_sec)
273
274             pcur.close()
275
276             if do_all:
277                 break
278
279         cur.close()
280
281         for t in self.threads:
282             t.wait()
283
284         rank_end_time = datetime.now()
285         diff_seconds = (rank_end_time-rank_start_time).total_seconds()
286
287         log.warning("Done {}/{} in {} @ {:.3f} per second - FINISHED {}\n".format(
288                  done_tuples, total_tuples, int(diff_seconds),
289                  done_tuples/diff_seconds, obj.name()))
290
291     def find_free_thread(self):
292         """ Generator that returns the next connection that is free for
293             sending a query.
294         """
295         ready = self.threads
296         command_stat = 0
297
298         while True:
299             for thread in ready:
300                 if thread.is_done():
301                     command_stat += 1
302                     yield thread
303
304             # refresh the connections occasionaly to avoid potential
305             # memory leaks in Postgresql.
306             if command_stat > 100000:
307                 for t in self.threads:
308                     while not t.is_done():
309                         t.wait()
310                     t.connect()
311                 command_stat = 0
312                 ready = self.threads
313             else:
314                 ready, _, _ = select.select(self.threads, [], [])
315
316         assert False, "Unreachable code"
317
318
319 def nominatim_arg_parser():
320     """ Setup the command-line parser for the tool.
321     """
322     def h(s):
323         return re.sub("\s\s+" , " ", s)
324
325     p = ArgumentParser(description="Indexing tool for Nominatim.",
326                        formatter_class=RawDescriptionHelpFormatter)
327
328     p.add_argument('-d', '--database',
329                    dest='dbname', action='store', default='nominatim',
330                    help='Name of the PostgreSQL database to connect to.')
331     p.add_argument('-U', '--username',
332                    dest='user', action='store',
333                    help='PostgreSQL user name.')
334     p.add_argument('-W', '--password',
335                    dest='password_prompt', action='store_true',
336                    help='Force password prompt.')
337     p.add_argument('-H', '--host',
338                    dest='host', action='store',
339                    help='PostgreSQL server hostname or socket location.')
340     p.add_argument('-P', '--port',
341                    dest='port', action='store',
342                    help='PostgreSQL server port')
343     p.add_argument('-r', '--minrank',
344                    dest='minrank', type=int, metavar='RANK', default=0,
345                    help='Minimum/starting rank.')
346     p.add_argument('-R', '--maxrank',
347                    dest='maxrank', type=int, metavar='RANK', default=30,
348                    help='Maximum/finishing rank.')
349     p.add_argument('-t', '--threads',
350                    dest='threads', type=int, metavar='NUM', default=1,
351                    help='Number of threads to create for indexing.')
352     p.add_argument('-v', '--verbose',
353                    dest='loglevel', action='count', default=0,
354                    help='Increase verbosity')
355
356     return p
357
358 if __name__ == '__main__':
359     logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s')
360
361     options = nominatim_arg_parser().parse_args(sys.argv[1:])
362
363     log.setLevel(max(3 - options.loglevel, 0) * 10)
364
365     options.password = None
366     if options.password_prompt:
367         password = getpass.getpass("Database password: ")
368         options.password = password
369
370     Indexer(options).run()