]> git.openstreetmap.org Git - nominatim.git/commitdiff
adapt to new type annotations from typeshed
authorSarah Hoffmann <lonvia@denofr.de>
Tue, 9 Aug 2022 09:06:54 +0000 (11:06 +0200)
committerSarah Hoffmann <lonvia@denofr.de>
Tue, 9 Aug 2022 09:06:54 +0000 (11:06 +0200)
Some more functions frrom psycopg are now properly annotated.
No ignoring necessary anymore.

nominatim/db/connection.py
nominatim/db/properties.py
nominatim/db/status.py
nominatim/tokenizer/icu_tokenizer.py
nominatim/tokenizer/legacy_tokenizer.py
nominatim/tools/admin.py

index 86ead02c61267d9f552344b9e2461c1d258c0d47..44a293d43b7f58dbfd19690525989b69333a9486 100644 (file)
@@ -55,7 +55,7 @@ class Cursor(psycopg2.extras.DictCursor):
         if self.rowcount != 1:
             raise RuntimeError("Query did not return a single row.")
 
-        result = self.fetchone() # type: ignore[no-untyped-call]
+        result = self.fetchone()
         assert result is not None
 
         return result[0]
@@ -131,7 +131,7 @@ class Connection(psycopg2.extensions.connection):
                 return False
 
             if table is not None:
-                row = cur.fetchone() # type: ignore[no-untyped-call]
+                row = cur.fetchone()
                 if row is None or not isinstance(row[0], str):
                     return False
                 return row[0] == table
@@ -236,7 +236,7 @@ def get_pg_env(dsn: str,
     """
     env = dict(base_env if base_env is not None else os.environ)
 
-    for param, value in psycopg2.extensions.parse_dsn(dsn).items(): # type: ignore
+    for param, value in psycopg2.extensions.parse_dsn(dsn).items():
         if param in _PG_CONNECTION_STRINGS:
             env[_PG_CONNECTION_STRINGS[param]] = value
         else:
index 40cb262edf4e048592ac8cc45849b2edeff669b8..e8d5e0ca5c47194f7e72402f3c6b541a251c6a2a 100644 (file)
@@ -41,4 +41,7 @@ def get_property(conn: Connection, name: str) -> Optional[str]:
         if cur.rowcount == 0:
             return None
 
-        return cast(Optional[str], cur.fetchone()[0]) # type: ignore[no-untyped-call]
+        result = cur.fetchone()
+        assert result is not None
+
+        return cast(Optional[str], result[0])
index aea25a976d8829a0fe65247e9d6fa8acb430c6c7..2c01de71466dca8fb237461509062c5c8c3e248a 100644 (file)
@@ -90,7 +90,7 @@ def get_status(conn: Connection) -> Tuple[Optional[dt.datetime], Optional[int],
         if cur.rowcount < 1:
             return None, None, None
 
-        row = cast(StatusRow, cur.fetchone()) # type: ignore[no-untyped-call]
+        row = cast(StatusRow, cur.fetchone())
         return row['lastimportdate'], row['sequence_id'], row['indexed']
 
 
index 319838a16849b7bc9d1bdae31b27dab07594eb5a..79f383f6f1c11b2e8aac46b09fe1afb75e26a981 100644 (file)
@@ -566,8 +566,9 @@ class ICUNameAnalyzer(AbstractAnalyzer):
                 result = self._cache.housenumbers.get(norm_name, result)
                 if result[0] is None:
                     with self.conn.cursor() as cur:
-                        cur.execute("SELECT getorcreate_hnr_id(%s)", (norm_name, ))
-                        result = cur.fetchone()[0], norm_name # type: ignore[no-untyped-call]
+                        hid = cur.scalar("SELECT getorcreate_hnr_id(%s)", (norm_name, ))
+
+                        result = hid, norm_name
                         self._cache.housenumbers[norm_name] = result
         else:
             # Otherwise use the analyzer to determine the canonical name.
@@ -580,9 +581,9 @@ class ICUNameAnalyzer(AbstractAnalyzer):
                     variants = analyzer.compute_variants(word_id)
                     if variants:
                         with self.conn.cursor() as cur:
-                            cur.execute("SELECT create_analyzed_hnr_id(%s, %s)",
-                                        (word_id, list(variants)))
-                            result = cur.fetchone()[0], variants[0] # type: ignore[no-untyped-call]
+                            hid = cur.scalar("SELECT create_analyzed_hnr_id(%s, %s)",
+                                             (word_id, list(variants)))
+                            result = hid, variants[0]
                             self._cache.housenumbers[word_id] = result
 
         return result
@@ -665,8 +666,7 @@ class ICUNameAnalyzer(AbstractAnalyzer):
                 with self.conn.cursor() as cur:
                     cur.execute("SELECT * FROM getorcreate_full_word(%s, %s)",
                                 (token_id, variants))
-                    full, part = cast(Tuple[int, List[int]],
-                                      cur.fetchone()) # type: ignore[no-untyped-call]
+                    full, part = cast(Tuple[int, List[int]], cur.fetchone())
 
                 self._cache.names[token_id] = (full, part)
 
index f52eaadac36eafbd5fa45eea4651c059e89b7d13..1b0b2980f9048f0fb42ac82e9b5bc8c8fe4f6298 100644 (file)
@@ -544,8 +544,9 @@ class _TokenInfo:
 
         with conn.cursor() as cur:
             cur.execute("SELECT * FROM create_housenumbers(%s)", (simple_list, ))
-            self.data['hnr_tokens'], self.data['hnr'] = \
-                cur.fetchone() # type: ignore[no-untyped-call]
+            result = cur.fetchone()
+            assert result is not None
+            self.data['hnr_tokens'], self.data['hnr'] = result
 
 
     def set_postcode(self, postcode: str) -> None:
@@ -574,8 +575,7 @@ class _TokenInfo:
                 cur.execute("""SELECT make_keywords(hstore('name' , %s))::text,
                                       word_ids_from_name(%s)::text""",
                             (name, name))
-                return cast(Tuple[List[int], List[int]],
-                            cur.fetchone()) # type: ignore[no-untyped-call]
+                return cast(Tuple[List[int], List[int]], cur.fetchone())
 
         self.data['place_search'], self.data['place_match'] = \
             self.cache.places.get(place, _get_place)
@@ -589,8 +589,7 @@ class _TokenInfo:
                 cur.execute("""SELECT addr_ids_from_name(%s)::text,
                                       word_ids_from_name(%s)::text""",
                             (name, name))
-                return cast(Tuple[List[int], List[int]],
-                            cur.fetchone()) # type: ignore[no-untyped-call]
+                return cast(Tuple[List[int], List[int]], cur.fetchone())
 
         tokens = {}
         for key, value in terms:
index 9fb944d3ada0b45e956396253ed2ebd3cdd2c856..da7845ebc949696145588b393664d4be2077eb37 100644 (file)
@@ -49,7 +49,7 @@ def _get_place_info(cursor: Cursor, osm_id: Optional[str],
         LOG.fatal("OSM object %s not found in database.", osm_id)
         raise UsageError("OSM object not found")
 
-    return cast(DictCursorResult, cursor.fetchone()) # type: ignore[no-untyped-call]
+    return cast(DictCursorResult, cursor.fetchone())
 
 
 def analyse_indexing(config: Configuration, osm_id: Optional[str] = None,