]> git.openstreetmap.org Git - nominatim.git/commitdiff
properly close connections of indexer after use
authorSarah Hoffmann <lonvia@denofr.de>
Fri, 26 Feb 2021 11:10:54 +0000 (12:10 +0100)
committerSarah Hoffmann <lonvia@denofr.de>
Fri, 26 Feb 2021 11:10:54 +0000 (12:10 +0100)
nominatim/indexer/indexer.py

index 61971497f957f15aeda81480f68e0d25cba32593..d997e522490735994080df726e256662a4c0c632 100644 (file)
@@ -124,8 +124,25 @@ class Indexer:
     """
 
     def __init__(self, dsn, num_threads):
     """
 
     def __init__(self, dsn, num_threads):
-        self.conn = psycopg2.connect(dsn)
-        self.threads = [DBConnection(dsn) for _ in range(num_threads)]
+        self.dsn = dsn
+        self.num_threads = num_threads
+        self.conn = None
+        self.threads = []
+
+
+    def _setup_connections(self):
+        self.conn = psycopg2.connect(self.dsn)
+        self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
+
+
+    def _close_connections(self):
+        if self.conn:
+            self.conn.close()
+            self.conn = None
+
+        for thread in self.threads:
+            thread.close()
+        threads = []
 
 
     def index_full(self, analyse=True):
 
 
     def index_full(self, analyse=True):
@@ -134,34 +151,44 @@ class Indexer:
             database will be analysed at the appropriate places to
             ensure that database statistics are updated.
         """
             database will be analysed at the appropriate places to
             ensure that database statistics are updated.
         """
-        self.index_by_rank(0, 4)
-        self._analyse_db_if(analyse)
+        conn = psycopg2.connect(self.dsn)
+
+        try:
+            self.index_by_rank(0, 4)
+            self._analyse_db_if(conn, analyse)
 
 
-        self.index_boundaries(0, 30)
-        self._analyse_db_if(analyse)
+            self.index_boundaries(0, 30)
+            self._analyse_db_if(conn, analyse)
 
 
-        self.index_by_rank(5, 25)
-        self._analyse_db_if(analyse)
+            self.index_by_rank(5, 25)
+            self._analyse_db_if(conn, analyse)
 
 
-        self.index_by_rank(26, 30)
-        self._analyse_db_if(analyse)
+            self.index_by_rank(26, 30)
+            self._analyse_db_if(conn, analyse)
 
 
-        self.index_postcodes()
-        self._analyse_db_if(analyse)
+            self.index_postcodes()
+            self._analyse_db_if(conn, analyse)
+        finally:
+            conn.close()
 
 
-    def _analyse_db_if(self, condition):
+    def _analyse_db_if(self, conn, condition):
         if condition:
         if condition:
-            with self.conn.cursor() as cur:
+            with conn.cursor() as cur:
                 cur.execute('ANALYSE')
 
     def index_boundaries(self, minrank, maxrank):
         """ Index only administrative boundaries within the given rank range.
         """
         LOG.warning("Starting indexing boundaries using %s threads",
                 cur.execute('ANALYSE')
 
     def index_boundaries(self, minrank, maxrank):
         """ Index only administrative boundaries within the given rank range.
         """
         LOG.warning("Starting indexing boundaries using %s threads",
-                    len(self.threads))
+                    self.num_threads)
+
+        self._setup_connections()
 
 
-        for rank in range(max(minrank, 4), min(maxrank, 26)):
-            self.index(BoundaryRunner(rank))
+        try:
+            for rank in range(max(minrank, 4), min(maxrank, 26)):
+                self.index(BoundaryRunner(rank))
+        finally:
+            self._close_connections()
 
     def index_by_rank(self, minrank, maxrank):
         """ Index all entries of placex in the given rank range (inclusive)
 
     def index_by_rank(self, minrank, maxrank):
         """ Index all entries of placex in the given rank range (inclusive)
@@ -172,30 +199,48 @@ class Indexer:
         """
         maxrank = min(maxrank, 30)
         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
         """
         maxrank = min(maxrank, 30)
         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
-                    minrank, maxrank, len(self.threads))
+                    minrank, maxrank, self.num_threads)
 
 
-        for rank in range(max(1, minrank), maxrank):
-            self.index(RankRunner(rank))
+        self._setup_connections()
 
 
-        if maxrank == 30:
-            self.index(RankRunner(0))
-            self.index(InterpolationRunner(), 20)
-            self.index(RankRunner(30), 20)
-        else:
-            self.index(RankRunner(maxrank))
+        try:
+            for rank in range(max(1, minrank), maxrank):
+                self.index(RankRunner(rank))
+
+            if maxrank == 30:
+                self.index(RankRunner(0))
+                self.index(InterpolationRunner(), 20)
+                self.index(RankRunner(30), 20)
+            else:
+                self.index(RankRunner(maxrank))
+        finally:
+            self._close_connections()
 
 
     def index_postcodes(self):
         """Index the entries ofthe location_postcode table.
         """
 
 
     def index_postcodes(self):
         """Index the entries ofthe location_postcode table.
         """
-        self.index(PostcodeRunner(), 20)
+        LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
+
+        self._setup_connections()
+
+        try:
+            self.index(PostcodeRunner(), 20)
+        finally:
+            self._close_connections()
 
     def update_status_table(self):
         """ Update the status in the status table to 'indexed'.
         """
 
     def update_status_table(self):
         """ Update the status in the status table to 'indexed'.
         """
-        with self.conn.cursor() as cur:
-            cur.execute('UPDATE import_status SET indexed = true')
-        self.conn.commit()
+        conn = psycopg2.connect(self.dsn)
+
+        try:
+            with conn.cursor() as cur:
+                cur.execute('UPDATE import_status SET indexed = true')
+
+            conn.commit()
+        finally:
+            conn.close()
 
     def index(self, obj, batch=1):
         """ Index a single rank or table. `obj` describes the SQL to use
 
     def index(self, obj, batch=1):
         """ Index a single rank or table. `obj` describes the SQL to use