2 Main work horse for indexing (computing addresses) the database.
9 from nominatim.indexer.progress import ProgressLogger
10 from nominatim.indexer import runners
11 from nominatim.db.async_connection import DBConnection
13 LOG = logging.getLogger()
16 def _analyse_db_if(conn, condition):
18 with conn.cursor() as cur:
19 cur.execute('ANALYSE')
23 """ Main indexing routine.
26 def __init__(self, dsn, num_threads):
28 self.num_threads = num_threads
33 def _setup_connections(self):
34 self.conn = psycopg2.connect(self.dsn)
35 self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
38 def _close_connections(self):
43 for thread in self.threads:
48 def index_full(self, analyse=True):
49 """ Index the complete database. This will first index boudnaries
50 followed by all other objects. When `analyse` is True, then the
51 database will be analysed at the appropriate places to
52 ensure that database statistics are updated.
54 conn = psycopg2.connect(self.dsn)
55 conn.autocommit = True
58 self.index_by_rank(0, 4)
59 _analyse_db_if(conn, analyse)
61 self.index_boundaries(0, 30)
62 _analyse_db_if(conn, analyse)
64 self.index_by_rank(5, 25)
65 _analyse_db_if(conn, analyse)
67 self.index_by_rank(26, 30)
68 _analyse_db_if(conn, analyse)
70 self.index_postcodes()
71 _analyse_db_if(conn, analyse)
76 def index_boundaries(self, minrank, maxrank):
77 """ Index only administrative boundaries within the given rank range.
79 LOG.warning("Starting indexing boundaries using %s threads",
82 self._setup_connections()
85 for rank in range(max(minrank, 4), min(maxrank, 26)):
86 self.index(runners.BoundaryRunner(rank))
88 self._close_connections()
90 def index_by_rank(self, minrank, maxrank):
91 """ Index all entries of placex in the given rank range (inclusive)
92 in order of their address rank.
94 When rank 30 is requested then also interpolations and
95 places with address rank 0 will be indexed.
97 maxrank = min(maxrank, 30)
98 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
99 minrank, maxrank, self.num_threads)
101 self._setup_connections()
104 for rank in range(max(1, minrank), maxrank):
105 self.index(runners.RankRunner(rank))
108 self.index(runners.RankRunner(0))
109 self.index(runners.InterpolationRunner(), 20)
110 self.index(runners.RankRunner(30), 20)
112 self.index(runners.RankRunner(maxrank))
114 self._close_connections()
117 def index_postcodes(self):
118 """Index the entries ofthe location_postcode table.
120 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
122 self._setup_connections()
125 self.index(runners.PostcodeRunner(), 20)
127 self._close_connections()
129 def update_status_table(self):
130 """ Update the status in the status table to 'indexed'.
132 conn = psycopg2.connect(self.dsn)
135 with conn.cursor() as cur:
136 cur.execute('UPDATE import_status SET indexed = true')
142 def index(self, obj, batch=1):
143 """ Index a single rank or table. `obj` describes the SQL to use
144 for indexing. `batch` describes the number of objects that
145 should be processed with a single SQL statement
147 LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
149 cur = self.conn.cursor()
150 cur.execute(obj.sql_count_objects())
152 total_tuples = cur.fetchone()[0]
153 LOG.debug("Total number of rows: %i", total_tuples)
157 progress = ProgressLogger(obj.name(), total_tuples)
160 cur = self.conn.cursor(name='places')
161 cur.execute(obj.sql_get_objects())
163 next_thread = self.find_free_thread()
165 places = [p[0] for p in cur.fetchmany(batch)]
169 LOG.debug("Processing places: %s", str(places))
170 thread = next(next_thread)
172 thread.perform(obj.sql_index_place(places))
173 progress.add(len(places))
177 for thread in self.threads:
182 def find_free_thread(self):
183 """ Generator that returns the next connection that is free for
195 # refresh the connections occasionaly to avoid potential
196 # memory leaks in Postgresql.
197 if command_stat > 100000:
198 for thread in self.threads:
199 while not thread.is_done():
205 ready, _, _ = select.select(self.threads, [], [])
207 assert False, "Unreachable code"