2 Main work horse for indexing (computing addresses) the database.
4 # pylint: disable=C0111
10 from .progress import ProgressLogger
11 from ..db.async_connection import DBConnection
13 LOG = logging.getLogger()
16 """ Returns SQL commands for indexing one rank within the placex table.
19 def __init__(self, rank):
23 return "rank {}".format(self.rank)
25 def sql_count_objects(self):
26 return """SELECT count(*) FROM placex
27 WHERE rank_address = {} and indexed_status > 0
30 def sql_get_objects(self):
31 return """SELECT place_id FROM placex
32 WHERE indexed_status > 0 and rank_address = {}
33 ORDER BY geometry_sector""".format(self.rank)
36 def sql_index_place(ids):
37 return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
38 .format(','.join((str(i) for i in ids)))
41 class InterpolationRunner:
42 """ Returns SQL commands for indexing the address interpolation table
43 location_property_osmline.
48 return "interpolation lines (location_property_osmline)"
51 def sql_count_objects():
52 return """SELECT count(*) FROM location_property_osmline
53 WHERE indexed_status > 0"""
56 def sql_get_objects():
57 return """SELECT place_id FROM location_property_osmline
58 WHERE indexed_status > 0
59 ORDER BY geometry_sector"""
62 def sql_index_place(ids):
63 return """UPDATE location_property_osmline
64 SET indexed_status = 0 WHERE place_id IN ({})
65 """.format(','.join((str(i) for i in ids)))
68 """ Returns SQL commands for indexing the administrative boundaries
72 def __init__(self, rank):
76 return "boundaries rank {}".format(self.rank)
78 def sql_count_objects(self):
79 return """SELECT count(*) FROM placex
80 WHERE indexed_status > 0
82 AND class = 'boundary' and type = 'administrative'
85 def sql_get_objects(self):
86 return """SELECT place_id FROM placex
87 WHERE indexed_status > 0 and rank_search = {}
88 and class = 'boundary' and type = 'administrative'
89 ORDER BY partition, admin_level
93 def sql_index_place(ids):
94 return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
95 .format(','.join((str(i) for i in ids)))
99 """ Provides the SQL commands for indexing the location_postcode table.
104 return "postcodes (location_postcode)"
107 def sql_count_objects():
108 return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
111 def sql_get_objects():
112 return """SELECT place_id FROM location_postcode
113 WHERE indexed_status > 0
114 ORDER BY country_code, postcode"""
117 def sql_index_place(ids):
118 return """UPDATE location_postcode SET indexed_status = 0
119 WHERE place_id IN ({})
120 """.format(','.join((str(i) for i in ids)))
123 def _analyse_db_if(conn, condition):
125 with conn.cursor() as cur:
126 cur.execute('ANALYSE')
130 """ Main indexing routine.
133 def __init__(self, dsn, num_threads):
135 self.num_threads = num_threads
140 def _setup_connections(self):
141 self.conn = psycopg2.connect(self.dsn)
142 self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
145 def _close_connections(self):
150 for thread in self.threads:
155 def index_full(self, analyse=True):
156 """ Index the complete database. This will first index boudnaries
157 followed by all other objects. When `analyse` is True, then the
158 database will be analysed at the appropriate places to
159 ensure that database statistics are updated.
161 conn = psycopg2.connect(self.dsn)
162 conn.autocommit = True
165 self.index_by_rank(0, 4)
166 _analyse_db_if(conn, analyse)
168 self.index_boundaries(0, 30)
169 _analyse_db_if(conn, analyse)
171 self.index_by_rank(5, 25)
172 _analyse_db_if(conn, analyse)
174 self.index_by_rank(26, 30)
175 _analyse_db_if(conn, analyse)
177 self.index_postcodes()
178 _analyse_db_if(conn, analyse)
183 def index_boundaries(self, minrank, maxrank):
184 """ Index only administrative boundaries within the given rank range.
186 LOG.warning("Starting indexing boundaries using %s threads",
189 self._setup_connections()
192 for rank in range(max(minrank, 4), min(maxrank, 26)):
193 self.index(BoundaryRunner(rank))
195 self._close_connections()
197 def index_by_rank(self, minrank, maxrank):
198 """ Index all entries of placex in the given rank range (inclusive)
199 in order of their address rank.
201 When rank 30 is requested then also interpolations and
202 places with address rank 0 will be indexed.
204 maxrank = min(maxrank, 30)
205 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
206 minrank, maxrank, self.num_threads)
208 self._setup_connections()
211 for rank in range(max(1, minrank), maxrank):
212 self.index(RankRunner(rank))
215 self.index(RankRunner(0))
216 self.index(InterpolationRunner(), 20)
217 self.index(RankRunner(30), 20)
219 self.index(RankRunner(maxrank))
221 self._close_connections()
224 def index_postcodes(self):
225 """Index the entries ofthe location_postcode table.
227 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
229 self._setup_connections()
232 self.index(PostcodeRunner(), 20)
234 self._close_connections()
236 def update_status_table(self):
237 """ Update the status in the status table to 'indexed'.
239 conn = psycopg2.connect(self.dsn)
242 with conn.cursor() as cur:
243 cur.execute('UPDATE import_status SET indexed = true')
249 def index(self, obj, batch=1):
250 """ Index a single rank or table. `obj` describes the SQL to use
251 for indexing. `batch` describes the number of objects that
252 should be processed with a single SQL statement
254 LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
256 cur = self.conn.cursor()
257 cur.execute(obj.sql_count_objects())
259 total_tuples = cur.fetchone()[0]
260 LOG.debug("Total number of rows: %i", total_tuples)
264 progress = ProgressLogger(obj.name(), total_tuples)
267 cur = self.conn.cursor(name='places')
268 cur.execute(obj.sql_get_objects())
270 next_thread = self.find_free_thread()
272 places = [p[0] for p in cur.fetchmany(batch)]
276 LOG.debug("Processing places: %s", str(places))
277 thread = next(next_thread)
279 thread.perform(obj.sql_index_place(places))
280 progress.add(len(places))
284 for thread in self.threads:
289 def find_free_thread(self):
290 """ Generator that returns the next connection that is free for
302 # refresh the connections occasionaly to avoid potential
303 # memory leaks in Postgresql.
304 if command_stat > 100000:
305 for thread in self.threads:
306 while not thread.is_done():
312 ready, _, _ = select.select(self.threads, [], [])
314 assert False, "Unreachable code"