]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/indexer/indexer.py
move postcode normalization into tokenizer
[nominatim.git] / nominatim / indexer / indexer.py
index a064b28580168cca6cb26a32bc1e398ea3f43442..d0c6ea0ca66c8c0927cbfb718e6cd7518bbe8238 100644 (file)
@@ -4,6 +4,8 @@ Main work horse for indexing (computing addresses) the database.
 import logging
 import select
 
+import psycopg2.extras
+
 from nominatim.indexer.progress import ProgressLogger
 from nominatim.indexer import runners
 from nominatim.db.async_connection import DBConnection
@@ -62,6 +64,7 @@ class WorkerPool:
                         thread.wait()
                     thread.connect()
                 ready = self.threads
+                command_stat = 0
             else:
                 _, ready, _ = select.select([], self.threads, [])
 
@@ -78,8 +81,9 @@ class Indexer:
     """ Main indexing routine.
     """
 
-    def __init__(self, dsn, num_threads):
+    def __init__(self, dsn, tokenizer, num_threads):
         self.dsn = dsn
+        self.tokenizer = tokenizer
         self.num_threads = num_threads
 
 
@@ -122,8 +126,9 @@ class Indexer:
         LOG.warning("Starting indexing boundaries using %s threads",
                     self.num_threads)
 
-        for rank in range(max(minrank, 4), min(maxrank, 26)):
-            self._index(runners.BoundaryRunner(rank))
+        with self.tokenizer.name_analyzer() as analyzer:
+            for rank in range(max(minrank, 4), min(maxrank, 26)):
+                self._index(runners.BoundaryRunner(rank, analyzer))
 
     def index_by_rank(self, minrank, maxrank):
         """ Index all entries of placex in the given rank range (inclusive)
@@ -136,15 +141,16 @@ class Indexer:
         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
                     minrank, maxrank, self.num_threads)
 
-        for rank in range(max(1, minrank), maxrank):
-            self._index(runners.RankRunner(rank))
+        with self.tokenizer.name_analyzer() as analyzer:
+            for rank in range(max(1, minrank), maxrank):
+                self._index(runners.RankRunner(rank, analyzer))
 
-        if maxrank == 30:
-            self._index(runners.RankRunner(0))
-            self._index(runners.InterpolationRunner(), 20)
-            self._index(runners.RankRunner(30), 20)
-        else:
-            self._index(runners.RankRunner(maxrank))
+            if maxrank == 30:
+                self._index(runners.RankRunner(0, analyzer))
+                self._index(runners.InterpolationRunner(analyzer), 20)
+                self._index(runners.RankRunner(30, analyzer), 20)
+            else:
+                self._index(runners.RankRunner(maxrank, analyzer))
 
 
     def index_postcodes(self):
@@ -172,6 +178,7 @@ class Indexer:
         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
 
         with connect(self.dsn) as conn:
+            psycopg2.extras.register_hstore(conn)
             with conn.cursor() as cur:
                 total_tuples = cur.scalar(runner.sql_count_objects())
                 LOG.debug("Total number of rows: %i", total_tuples)
@@ -186,14 +193,14 @@ class Indexer:
 
                     with WorkerPool(self.dsn, self.num_threads) as pool:
                         while True:
-                            places = [p[0] for p in cur.fetchmany(batch)]
+                            places = cur.fetchmany(batch)
                             if not places:
                                 break
 
                             LOG.debug("Processing places: %s", str(places))
                             worker = pool.next_free_worker()
 
-                            worker.perform(runner.sql_index_place(places))
+                            runner.index_places(worker, places)
                             progress.add(len(places))
 
                         pool.finish_all()