]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/nominatim.py
Merge pull request #1902 from lonvia/avoid-touching-boundaries-in-addresses
[nominatim.git] / nominatim / nominatim.py
1 #! /usr/bin/env python3
2 #-----------------------------------------------------------------------------
3 # nominatim - [description]
4 #-----------------------------------------------------------------------------
5 #
6 # Indexing tool for the Nominatim database.
7 #
8 # Based on C version by Brian Quinion
9 #
10 # This program is free software; you can redistribute it and/or
11 # modify it under the terms of the GNU General Public License
12 # as published by the Free Software Foundation; either version 2
13 # of the License, or (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this program; if not, write to the Free Software
22 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
23 #-----------------------------------------------------------------------------
24
25 from argparse import ArgumentParser, RawDescriptionHelpFormatter, ArgumentTypeError
26 import logging
27 import sys
28 import re
29 import getpass
30 from datetime import datetime
31 import psycopg2
32 from psycopg2.extras import wait_select
33 import select
34
35 from indexer.progress import ProgressLogger
36
37 log = logging.getLogger()
38
39 def make_connection(options, asynchronous=False):
40     params = {'dbname' : options.dbname,
41               'user' : options.user,
42               'password' : options.password,
43               'host' : options.host,
44               'port' : options.port,
45               'async' : asynchronous}
46
47     return psycopg2.connect(**params)
48
49
50 class RankRunner(object):
51     """ Returns SQL commands for indexing one rank within the placex table.
52     """
53
54     def __init__(self, rank):
55         self.rank = rank
56
57     def name(self):
58         return "rank {}".format(self.rank)
59
60     def sql_count_objects(self):
61         return """SELECT count(*) FROM placex
62                   WHERE rank_search = {} and indexed_status > 0
63                """.format(self.rank)
64
65     def sql_get_objects(self):
66         return """SELECT place_id FROM placex
67                   WHERE indexed_status > 0 and rank_search = {}
68                   ORDER BY geometry_sector""".format(self.rank)
69
70     def sql_index_place(self, ids):
71         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
72                .format(','.join((str(i) for i in ids)))
73
74
75 class InterpolationRunner(object):
76     """ Returns SQL commands for indexing the address interpolation table
77         location_property_osmline.
78     """
79
80     def name(self):
81         return "interpolation lines (location_property_osmline)"
82
83     def sql_count_objects(self):
84         return """SELECT count(*) FROM location_property_osmline
85                   WHERE indexed_status > 0"""
86
87     def sql_get_objects(self):
88         return """SELECT place_id FROM location_property_osmline
89                   WHERE indexed_status > 0
90                   ORDER BY geometry_sector"""
91
92     def sql_index_place(self, ids):
93         return """UPDATE location_property_osmline
94                   SET indexed_status = 0 WHERE place_id IN ({})"""\
95                .format(','.join((str(i) for i in ids)))
96
97
98 class DBConnection(object):
99     """ A single non-blocking database connection.
100     """
101
102     def __init__(self, options):
103         self.current_query = None
104         self.current_params = None
105
106         self.conn = None
107         self.connect()
108
109     def connect(self):
110         if self.conn is not None:
111             self.cursor.close()
112             self.conn.close()
113
114         self.conn = make_connection(options, asynchronous=True)
115         self.wait()
116
117         self.cursor = self.conn.cursor()
118         # Disable JIT and parallel workers as they are known to cause problems.
119         # Update pg_settings instead of using SET because it does not yield
120         # errors on older versions of Postgres where the settings are not
121         # implemented.
122         self.perform(
123             """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost';
124                 UPDATE pg_settings SET setting = 0 
125                    WHERE name = 'max_parallel_workers_per_gather';""")
126         self.wait()
127
128     def wait(self):
129         """ Block until any pending operation is done.
130         """
131         while True:
132             try:
133                 wait_select(self.conn)
134                 self.current_query = None
135                 return
136             except psycopg2.extensions.TransactionRollbackError as e:
137                 if e.pgcode == '40P01':
138                     log.info("Deadlock detected (params = {}), retry."
139                               .format(self.current_params))
140                     self.cursor.execute(self.current_query, self.current_params)
141                 else:
142                     raise
143             except psycopg2.errors.DeadlockDetected:
144                 self.cursor.execute(self.current_query, self.current_params)
145
146     def perform(self, sql, args=None):
147         """ Send SQL query to the server. Returns immediately without
148             blocking.
149         """
150         self.current_query = sql
151         self.current_params = args
152         self.cursor.execute(sql, args)
153
154     def fileno(self):
155         """ File descriptor to wait for. (Makes this class select()able.)
156         """
157         return self.conn.fileno()
158
159     def is_done(self):
160         """ Check if the connection is available for a new query.
161
162             Also checks if the previous query has run into a deadlock.
163             If so, then the previous query is repeated.
164         """
165         if self.current_query is None:
166             return True
167
168         try:
169             if self.conn.poll() == psycopg2.extensions.POLL_OK:
170                 self.current_query = None
171                 return True
172         except psycopg2.extensions.TransactionRollbackError as e:
173             if e.pgcode == '40P01':
174                 log.info("Deadlock detected (params = {}), retry.".format(self.current_params))
175                 self.cursor.execute(self.current_query, self.current_params)
176             else:
177                 raise
178         except psycopg2.errors.DeadlockDetected:
179             self.cursor.execute(self.current_query, self.current_params)
180
181         return False
182
183
184 class Indexer(object):
185     """ Main indexing routine.
186     """
187
188     def __init__(self, options):
189         self.minrank = max(0, options.minrank)
190         self.maxrank = min(30, options.maxrank)
191         self.conn = make_connection(options)
192         self.threads = [DBConnection(options) for i in range(options.threads)]
193
194     def run(self):
195         """ Run indexing over the entire database.
196         """
197         log.warning("Starting indexing rank ({} to {}) using {} threads".format(
198                  self.minrank, self.maxrank, len(self.threads)))
199
200         for rank in range(self.minrank, self.maxrank):
201             self.index(RankRunner(rank))
202
203         if self.maxrank == 30:
204             self.index(InterpolationRunner(), 20)
205
206         self.index(RankRunner(self.maxrank), 20)
207
208     def index(self, obj, batch=1):
209         """ Index a single rank or table. `obj` describes the SQL to use
210             for indexing. `batch` describes the number of objects that
211             should be processed with a single SQL statement
212         """
213         log.warning("Starting {}".format(obj.name()))
214
215         cur = self.conn.cursor()
216         cur.execute(obj.sql_count_objects())
217
218         total_tuples = cur.fetchone()[0]
219         log.debug("Total number of rows: {}".format(total_tuples))
220
221         cur.close()
222
223         next_thread = self.find_free_thread()
224         progress = ProgressLogger(obj.name(), total_tuples)
225
226         cur = self.conn.cursor(name='places')
227         cur.execute(obj.sql_get_objects())
228
229         while True:
230             places = [p[0] for p in cur.fetchmany(batch)]
231             if len(places) == 0:
232                 break
233
234             log.debug("Processing places: {}".format(places))
235             thread = next(next_thread)
236
237             thread.perform(obj.sql_index_place(places))
238             progress.add(len(places))
239
240         cur.close()
241
242         for t in self.threads:
243             t.wait()
244
245         progress.done()
246
247     def find_free_thread(self):
248         """ Generator that returns the next connection that is free for
249             sending a query.
250         """
251         ready = self.threads
252         command_stat = 0
253
254         while True:
255             for thread in ready:
256                 if thread.is_done():
257                     command_stat += 1
258                     yield thread
259
260             # refresh the connections occasionaly to avoid potential
261             # memory leaks in Postgresql.
262             if command_stat > 100000:
263                 for t in self.threads:
264                     while not t.is_done():
265                         t.wait()
266                     t.connect()
267                 command_stat = 0
268                 ready = self.threads
269             else:
270                 ready, _, _ = select.select(self.threads, [], [])
271
272         assert False, "Unreachable code"
273
274
275 def nominatim_arg_parser():
276     """ Setup the command-line parser for the tool.
277     """
278     def h(s):
279         return re.sub("\s\s+" , " ", s)
280
281     p = ArgumentParser(description="Indexing tool for Nominatim.",
282                        formatter_class=RawDescriptionHelpFormatter)
283
284     p.add_argument('-d', '--database',
285                    dest='dbname', action='store', default='nominatim',
286                    help='Name of the PostgreSQL database to connect to.')
287     p.add_argument('-U', '--username',
288                    dest='user', action='store',
289                    help='PostgreSQL user name.')
290     p.add_argument('-W', '--password',
291                    dest='password_prompt', action='store_true',
292                    help='Force password prompt.')
293     p.add_argument('-H', '--host',
294                    dest='host', action='store',
295                    help='PostgreSQL server hostname or socket location.')
296     p.add_argument('-P', '--port',
297                    dest='port', action='store',
298                    help='PostgreSQL server port')
299     p.add_argument('-r', '--minrank',
300                    dest='minrank', type=int, metavar='RANK', default=0,
301                    help='Minimum/starting rank.')
302     p.add_argument('-R', '--maxrank',
303                    dest='maxrank', type=int, metavar='RANK', default=30,
304                    help='Maximum/finishing rank.')
305     p.add_argument('-t', '--threads',
306                    dest='threads', type=int, metavar='NUM', default=1,
307                    help='Number of threads to create for indexing.')
308     p.add_argument('-v', '--verbose',
309                    dest='loglevel', action='count', default=0,
310                    help='Increase verbosity')
311
312     return p
313
314 if __name__ == '__main__':
315     logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s')
316
317     options = nominatim_arg_parser().parse_args(sys.argv[1:])
318
319     log.setLevel(max(3 - options.loglevel, 0) * 10)
320
321     options.password = None
322     if options.password_prompt:
323         password = getpass.getpass("Database password: ")
324         options.password = password
325
326     Indexer(options).run()