2 Main work horse for indexing (computing addresses) the database.
7 from nominatim.indexer.progress import ProgressLogger
8 from nominatim.indexer import runners
9 from nominatim.db.async_connection import DBConnection
10 from nominatim.db.connection import connect
12 LOG = logging.getLogger()
16 """ Main indexing routine.
19 def __init__(self, dsn, num_threads):
21 self.num_threads = num_threads
25 def _setup_connections(self):
26 self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
29 def _close_connections(self):
30 for thread in self.threads:
35 def index_full(self, analyse=True):
36 """ Index the complete database. This will first index boudnaries
37 followed by all other objects. When `analyse` is True, then the
38 database will be analysed at the appropriate places to
39 ensure that database statistics are updated.
41 with connect(self.dsn) as conn:
42 conn.autocommit = True
46 with conn.cursor() as cur:
47 cur.execute('ANALYSE')
52 self.index_by_rank(0, 4)
55 self.index_boundaries(0, 30)
58 self.index_by_rank(5, 25)
61 self.index_by_rank(26, 30)
64 self.index_postcodes()
68 def index_boundaries(self, minrank, maxrank):
69 """ Index only administrative boundaries within the given rank range.
71 LOG.warning("Starting indexing boundaries using %s threads",
74 self._setup_connections()
77 for rank in range(max(minrank, 4), min(maxrank, 26)):
78 self._index(runners.BoundaryRunner(rank))
80 self._close_connections()
82 def index_by_rank(self, minrank, maxrank):
83 """ Index all entries of placex in the given rank range (inclusive)
84 in order of their address rank.
86 When rank 30 is requested then also interpolations and
87 places with address rank 0 will be indexed.
89 maxrank = min(maxrank, 30)
90 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
91 minrank, maxrank, self.num_threads)
93 self._setup_connections()
96 for rank in range(max(1, minrank), maxrank):
97 self._index(runners.RankRunner(rank))
100 self._index(runners.RankRunner(0))
101 self._index(runners.InterpolationRunner(), 20)
102 self._index(runners.RankRunner(30), 20)
104 self._index(runners.RankRunner(maxrank))
106 self._close_connections()
109 def index_postcodes(self):
110 """Index the entries ofthe location_postcode table.
112 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
114 self._setup_connections()
117 self._index(runners.PostcodeRunner(), 20)
119 self._close_connections()
121 def update_status_table(self):
122 """ Update the status in the status table to 'indexed'.
124 with connect(self.dsn) as conn:
125 with conn.cursor() as cur:
126 cur.execute('UPDATE import_status SET indexed = true')
130 def _index(self, runner, batch=1):
131 """ Index a single rank or table. `runner` describes the SQL to use
132 for indexing. `batch` describes the number of objects that
133 should be processed with a single SQL statement
135 LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
137 with connect(self.dsn) as conn:
138 with conn.cursor() as cur:
139 total_tuples = cur.scalar(runner.sql_count_objects())
140 LOG.debug("Total number of rows: %i", total_tuples)
144 progress = ProgressLogger(runner.name(), total_tuples)
147 with conn.cursor(name='places') as cur:
148 cur.execute(runner.sql_get_objects())
150 next_thread = self.find_free_thread()
152 places = [p[0] for p in cur.fetchmany(batch)]
156 LOG.debug("Processing places: %s", str(places))
157 thread = next(next_thread)
159 thread.perform(runner.sql_index_place(places))
160 progress.add(len(places))
164 for thread in self.threads:
169 def find_free_thread(self):
170 """ Generator that returns the next connection that is free for
182 # refresh the connections occasionaly to avoid potential
183 # memory leaks in Postgresql.
184 if command_stat > 100000:
185 for thread in self.threads:
186 while not thread.is_done():
192 ready, _, _ = select.select(self.threads, [], [])
194 assert False, "Unreachable code"