2 Main work horse for indexing (computing addresses) the database.
4 # pylint: disable=C0111
10 from .progress import ProgressLogger
11 from ..db.async_connection import DBConnection
13 LOG = logging.getLogger()
16 """ Returns SQL commands for indexing one rank within the placex table.
19 def __init__(self, rank):
23 return "rank {}".format(self.rank)
25 def sql_count_objects(self):
26 return """SELECT count(*) FROM placex
27 WHERE rank_address = {} and indexed_status > 0
30 def sql_get_objects(self):
31 return """SELECT place_id FROM placex
32 WHERE indexed_status > 0 and rank_address = {}
33 ORDER BY geometry_sector""".format(self.rank)
36 def sql_index_place(ids):
37 return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
38 .format(','.join((str(i) for i in ids)))
41 class InterpolationRunner:
42 """ Returns SQL commands for indexing the address interpolation table
43 location_property_osmline.
48 return "interpolation lines (location_property_osmline)"
51 def sql_count_objects():
52 return """SELECT count(*) FROM location_property_osmline
53 WHERE indexed_status > 0"""
56 def sql_get_objects():
57 return """SELECT place_id FROM location_property_osmline
58 WHERE indexed_status > 0
59 ORDER BY geometry_sector"""
62 def sql_index_place(ids):
63 return """UPDATE location_property_osmline
64 SET indexed_status = 0 WHERE place_id IN ({})
65 """.format(','.join((str(i) for i in ids)))
68 """ Returns SQL commands for indexing the administrative boundaries
72 def __init__(self, rank):
76 return "boundaries rank {}".format(self.rank)
78 def sql_count_objects(self):
79 return """SELECT count(*) FROM placex
80 WHERE indexed_status > 0
82 AND class = 'boundary' and type = 'administrative'
85 def sql_get_objects(self):
86 return """SELECT place_id FROM placex
87 WHERE indexed_status > 0 and rank_search = {}
88 and class = 'boundary' and type = 'administrative'
89 ORDER BY partition, admin_level
93 def sql_index_place(ids):
94 return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
95 .format(','.join((str(i) for i in ids)))
99 """ Provides the SQL commands for indexing the location_postcode table.
104 return "postcodes (location_postcode)"
107 def sql_count_objects():
108 return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
111 def sql_get_objects():
112 return """SELECT place_id FROM location_postcode
113 WHERE indexed_status > 0
114 ORDER BY country_code, postcode"""
117 def sql_index_place(ids):
118 return """UPDATE location_postcode SET indexed_status = 0
119 WHERE place_id IN ({})
120 """.format(','.join((str(i) for i in ids)))
123 """ Main indexing routine.
126 def __init__(self, dsn, num_threads):
127 self.conn = psycopg2.connect(dsn)
128 self.threads = [DBConnection(dsn) for _ in range(num_threads)]
131 def index_full(self, analyse=True):
132 """ Index the complete database. This will first index boudnaries
133 followed by all other objects. When `analyse` is True, then the
134 database will be analysed at the appropriate places to
135 ensure that database statistics are updated.
137 self.index_by_rank(0, 4)
138 self._analyse_db_if(analyse)
140 self.index_boundaries(0, 30)
141 self._analyse_db_if(analyse)
143 self.index_by_rank(5, 25)
144 self._analyse_db_if(analyse)
146 self.index_by_rank(26, 30)
147 self._analyse_db_if(analyse)
149 self.index_postcodes()
150 self._analyse_db_if(analyse)
152 def _analyse_db_if(self, condition):
154 with self.conn.cursor() as cur:
155 cur.execute('ANALYSE')
157 def index_boundaries(self, minrank, maxrank):
158 """ Index only administrative boundaries within the given rank range.
160 LOG.warning("Starting indexing boundaries using %s threads",
163 for rank in range(max(minrank, 4), min(maxrank, 26)):
164 self.index(BoundaryRunner(rank))
166 def index_by_rank(self, minrank, maxrank):
167 """ Index all entries of placex in the given rank range (inclusive)
168 in order of their address rank.
170 When rank 30 is requested then also interpolations and
171 places with address rank 0 will be indexed.
173 maxrank = min(maxrank, 30)
174 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
175 minrank, maxrank, len(self.threads))
177 for rank in range(max(1, minrank), maxrank):
178 self.index(RankRunner(rank))
181 self.index(RankRunner(0))
182 self.index(InterpolationRunner(), 20)
183 self.index(RankRunner(30), 20)
185 self.index(RankRunner(maxrank))
188 def index_postcodes(self):
189 """Index the entries ofthe location_postcode table.
191 self.index(PostcodeRunner(), 20)
193 def update_status_table(self):
194 """ Update the status in the status table to 'indexed'.
196 with self.conn.cursor() as cur:
197 cur.execute('UPDATE import_status SET indexed = true')
200 def index(self, obj, batch=1):
201 """ Index a single rank or table. `obj` describes the SQL to use
202 for indexing. `batch` describes the number of objects that
203 should be processed with a single SQL statement
205 LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
207 cur = self.conn.cursor()
208 cur.execute(obj.sql_count_objects())
210 total_tuples = cur.fetchone()[0]
211 LOG.debug("Total number of rows: %i", total_tuples)
215 progress = ProgressLogger(obj.name(), total_tuples)
218 cur = self.conn.cursor(name='places')
219 cur.execute(obj.sql_get_objects())
221 next_thread = self.find_free_thread()
223 places = [p[0] for p in cur.fetchmany(batch)]
227 LOG.debug("Processing places: %s", str(places))
228 thread = next(next_thread)
230 thread.perform(obj.sql_index_place(places))
231 progress.add(len(places))
235 for thread in self.threads:
240 def find_free_thread(self):
241 """ Generator that returns the next connection that is free for
253 # refresh the connections occasionaly to avoid potential
254 # memory leaks in Postgresql.
255 if command_stat > 100000:
256 for thread in self.threads:
257 while not thread.is_done():
263 ready, _, _ = select.select(self.threads, [], [])
265 assert False, "Unreachable code"