]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tokenizer/legacy_tokenizer.py
properly close connections when shutting down starlette
[nominatim.git] / nominatim / tokenizer / legacy_tokenizer.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tokenizer implementing normalisation as used before Nominatim 4.
9 """
10 from typing import Optional, Sequence, List, Tuple, Mapping, Any, Callable, \
11                    cast, Dict, Set, Iterable
12 from collections import OrderedDict
13 import logging
14 from pathlib import Path
15 import re
16 import shutil
17 from textwrap import dedent
18
19 from icu import Transliterator
20 import psycopg2
21 import psycopg2.extras
22
23 from nominatim.db.connection import connect, Connection
24 from nominatim.config import Configuration
25 from nominatim.db import properties
26 from nominatim.db import utils as db_utils
27 from nominatim.db.sql_preprocessor import SQLPreprocessor
28 from nominatim.data.place_info import PlaceInfo
29 from nominatim.errors import UsageError
30 from nominatim.tokenizer.base import AbstractAnalyzer, AbstractTokenizer
31
32 DBCFG_NORMALIZATION = "tokenizer_normalization"
33 DBCFG_MAXWORDFREQ = "tokenizer_maxwordfreq"
34
35 LOG = logging.getLogger()
36
37 def create(dsn: str, data_dir: Path) -> 'LegacyTokenizer':
38     """ Create a new instance of the tokenizer provided by this module.
39     """
40     return LegacyTokenizer(dsn, data_dir)
41
42
43 def _install_module(config_module_path: str, src_dir: Path, module_dir: Path) -> str:
44     """ Copies the PostgreSQL normalisation module into the project
45         directory if necessary. For historical reasons the module is
46         saved in the '/module' subdirectory and not with the other tokenizer
47         data.
48
49         The function detects when the installation is run from the
50         build directory. It doesn't touch the module in that case.
51     """
52     # Custom module locations are simply used as is.
53     if config_module_path:
54         LOG.info("Using custom path for database module at '%s'", config_module_path)
55         return config_module_path
56
57     # Compatibility mode for builddir installations.
58     if module_dir.exists() and src_dir.samefile(module_dir):
59         LOG.info('Running from build directory. Leaving database module as is.')
60         return str(module_dir)
61
62     # In any other case install the module in the project directory.
63     if not module_dir.exists():
64         module_dir.mkdir()
65
66     destfile = module_dir / 'nominatim.so'
67     shutil.copy(str(src_dir / 'nominatim.so'), str(destfile))
68     destfile.chmod(0o755)
69
70     LOG.info('Database module installed at %s', str(destfile))
71
72     return str(module_dir)
73
74
75 def _check_module(module_dir: str, conn: Connection) -> None:
76     """ Try to use the PostgreSQL module to confirm that it is correctly
77         installed and accessible from PostgreSQL.
78     """
79     with conn.cursor() as cur:
80         try:
81             cur.execute("""CREATE FUNCTION nominatim_test_import_func(text)
82                            RETURNS text AS %s, 'transliteration'
83                            LANGUAGE c IMMUTABLE STRICT;
84                            DROP FUNCTION nominatim_test_import_func(text)
85                         """, (f'{module_dir}/nominatim.so', ))
86         except psycopg2.DatabaseError as err:
87             LOG.fatal("Error accessing database module: %s", err)
88             raise UsageError("Database module cannot be accessed.") from err
89
90
91 class LegacyTokenizer(AbstractTokenizer):
92     """ The legacy tokenizer uses a special PostgreSQL module to normalize
93         names and queries. The tokenizer thus implements normalization through
94         calls to the database.
95     """
96
97     def __init__(self, dsn: str, data_dir: Path) -> None:
98         self.dsn = dsn
99         self.data_dir = data_dir
100         self.normalization: Optional[str] = None
101
102
103     def init_new_db(self, config: Configuration, init_db: bool = True) -> None:
104         """ Set up a new tokenizer for the database.
105
106             This copies all necessary data in the project directory to make
107             sure the tokenizer remains stable even over updates.
108         """
109         assert config.project_dir is not None
110         module_dir = _install_module(config.DATABASE_MODULE_PATH,
111                                      config.lib_dir.module,
112                                      config.project_dir / 'module')
113
114         self.normalization = config.TERM_NORMALIZATION
115
116         self._install_php(config, overwrite=True)
117
118         with connect(self.dsn) as conn:
119             _check_module(module_dir, conn)
120             self._save_config(conn, config)
121             conn.commit()
122
123         if init_db:
124             self.update_sql_functions(config)
125             self._init_db_tables(config)
126
127
128     def init_from_project(self, config: Configuration) -> None:
129         """ Initialise the tokenizer from the project directory.
130         """
131         assert config.project_dir is not None
132
133         with connect(self.dsn) as conn:
134             self.normalization = properties.get_property(conn, DBCFG_NORMALIZATION)
135
136         if not (config.project_dir / 'module' / 'nominatim.so').exists():
137             _install_module(config.DATABASE_MODULE_PATH,
138                             config.lib_dir.module,
139                             config.project_dir / 'module')
140
141         self._install_php(config, overwrite=False)
142
143     def finalize_import(self, config: Configuration) -> None:
144         """ Do any required postprocessing to make the tokenizer data ready
145             for use.
146         """
147         with connect(self.dsn) as conn:
148             sqlp = SQLPreprocessor(conn, config)
149             sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer_indices.sql')
150
151
152     def update_sql_functions(self, config: Configuration) -> None:
153         """ Reimport the SQL functions for this tokenizer.
154         """
155         assert config.project_dir is not None
156
157         with connect(self.dsn) as conn:
158             max_word_freq = properties.get_property(conn, DBCFG_MAXWORDFREQ)
159             modulepath = config.DATABASE_MODULE_PATH or \
160                          str((config.project_dir / 'module').resolve())
161             sqlp = SQLPreprocessor(conn, config)
162             sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer.sql',
163                               max_word_freq=max_word_freq,
164                               modulepath=modulepath)
165
166
167     def check_database(self, _: Configuration) -> Optional[str]:
168         """ Check that the tokenizer is set up correctly.
169         """
170         hint = """\
171              The Postgresql extension nominatim.so was not correctly loaded.
172
173              Error: {error}
174
175              Hints:
176              * Check the output of the CMmake/make installation step
177              * Does nominatim.so exist?
178              * Does nominatim.so exist on the database server?
179              * Can nominatim.so be accessed by the database user?
180              """
181         with connect(self.dsn) as conn:
182             with conn.cursor() as cur:
183                 try:
184                     out = cur.scalar("SELECT make_standard_name('a')")
185                 except psycopg2.Error as err:
186                     return hint.format(error=str(err))
187
188         if out != 'a':
189             return hint.format(error='Unexpected result for make_standard_name()')
190
191         return None
192
193
194     def migrate_database(self, config: Configuration) -> None:
195         """ Initialise the project directory of an existing database for
196             use with this tokenizer.
197
198             This is a special migration function for updating existing databases
199             to new software versions.
200         """
201         assert config.project_dir is not None
202
203         self.normalization = config.TERM_NORMALIZATION
204         module_dir = _install_module(config.DATABASE_MODULE_PATH,
205                                      config.lib_dir.module,
206                                      config.project_dir / 'module')
207
208         with connect(self.dsn) as conn:
209             _check_module(module_dir, conn)
210             self._save_config(conn, config)
211
212
213     def update_statistics(self) -> None:
214         """ Recompute the frequency of full words.
215         """
216         with connect(self.dsn) as conn:
217             if conn.table_exists('search_name'):
218                 with conn.cursor() as cur:
219                     cur.drop_table("word_frequencies")
220                     LOG.info("Computing word frequencies")
221                     cur.execute("""CREATE TEMP TABLE word_frequencies AS
222                                      SELECT unnest(name_vector) as id, count(*)
223                                      FROM search_name GROUP BY id""")
224                     cur.execute("CREATE INDEX ON word_frequencies(id)")
225                     LOG.info("Update word table with recomputed frequencies")
226                     cur.execute("""UPDATE word SET search_name_count = count
227                                    FROM word_frequencies
228                                    WHERE word_token like ' %' and word_id = id""")
229                     cur.drop_table("word_frequencies")
230             conn.commit()
231
232
233     def update_word_tokens(self) -> None:
234         """ No house-keeping implemented for the legacy tokenizer.
235         """
236         LOG.info("No tokenizer clean-up available.")
237
238
239     def name_analyzer(self) -> 'LegacyNameAnalyzer':
240         """ Create a new analyzer for tokenizing names and queries
241             using this tokinzer. Analyzers are context managers and should
242             be used accordingly:
243
244             ```
245             with tokenizer.name_analyzer() as analyzer:
246                 analyser.tokenize()
247             ```
248
249             When used outside the with construct, the caller must ensure to
250             call the close() function before destructing the analyzer.
251
252             Analyzers are not thread-safe. You need to instantiate one per thread.
253         """
254         normalizer = Transliterator.createFromRules("phrase normalizer",
255                                                     self.normalization)
256         return LegacyNameAnalyzer(self.dsn, normalizer)
257
258
259     def _install_php(self, config: Configuration, overwrite: bool = True) -> None:
260         """ Install the php script for the tokenizer.
261         """
262         php_file = self.data_dir / "tokenizer.php"
263
264         if not php_file.exists() or overwrite:
265             php_file.write_text(dedent(f"""\
266                 <?php
267                 @define('CONST_Max_Word_Frequency', {config.MAX_WORD_FREQUENCY});
268                 @define('CONST_Term_Normalization_Rules', "{config.TERM_NORMALIZATION}");
269                 require_once('{config.lib_dir.php}/tokenizer/legacy_tokenizer.php');
270                 """), encoding='utf-8')
271
272
273     def _init_db_tables(self, config: Configuration) -> None:
274         """ Set up the word table and fill it with pre-computed word
275             frequencies.
276         """
277         with connect(self.dsn) as conn:
278             sqlp = SQLPreprocessor(conn, config)
279             sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer_tables.sql')
280             conn.commit()
281
282         LOG.warning("Precomputing word tokens")
283         db_utils.execute_file(self.dsn, config.lib_dir.data / 'words.sql')
284
285
286     def _save_config(self, conn: Connection, config: Configuration) -> None:
287         """ Save the configuration that needs to remain stable for the given
288             database as database properties.
289         """
290         assert self.normalization is not None
291
292         properties.set_property(conn, DBCFG_NORMALIZATION, self.normalization)
293         properties.set_property(conn, DBCFG_MAXWORDFREQ, config.MAX_WORD_FREQUENCY)
294
295
296 class LegacyNameAnalyzer(AbstractAnalyzer):
297     """ The legacy analyzer uses the special Postgresql module for
298         splitting names.
299
300         Each instance opens a connection to the database to request the
301         normalization.
302     """
303
304     def __init__(self, dsn: str, normalizer: Any):
305         self.conn: Optional[Connection] = connect(dsn).connection
306         self.conn.autocommit = True
307         self.normalizer = normalizer
308         psycopg2.extras.register_hstore(self.conn)
309
310         self._cache = _TokenCache(self.conn)
311
312
313     def close(self) -> None:
314         """ Free all resources used by the analyzer.
315         """
316         if self.conn:
317             self.conn.close()
318             self.conn = None
319
320
321     def get_word_token_info(self, words: Sequence[str]) -> List[Tuple[str, str, int]]:
322         """ Return token information for the given list of words.
323             If a word starts with # it is assumed to be a full name
324             otherwise is a partial name.
325
326             The function returns a list of tuples with
327             (original word, word token, word id).
328
329             The function is used for testing and debugging only
330             and not necessarily efficient.
331         """
332         assert self.conn is not None
333         with self.conn.cursor() as cur:
334             cur.execute("""SELECT t.term, word_token, word_id
335                            FROM word, (SELECT unnest(%s::TEXT[]) as term) t
336                            WHERE word_token = (CASE
337                                    WHEN left(t.term, 1) = '#' THEN
338                                      ' ' || make_standard_name(substring(t.term from 2))
339                                    ELSE
340                                      make_standard_name(t.term)
341                                    END)
342                                  and class is null and country_code is null""",
343                         (words, ))
344
345             return [(r[0], r[1], r[2]) for r in cur]
346
347
348     def normalize(self, phrase: str) -> str:
349         """ Normalize the given phrase, i.e. remove all properties that
350             are irrelevant for search.
351         """
352         return cast(str, self.normalizer.transliterate(phrase))
353
354
355     def normalize_postcode(self, postcode: str) -> str:
356         """ Convert the postcode to a standardized form.
357
358             This function must yield exactly the same result as the SQL function
359             'token_normalized_postcode()'.
360         """
361         return postcode.strip().upper()
362
363
364     def update_postcodes_from_db(self) -> None:
365         """ Update postcode tokens in the word table from the location_postcode
366             table.
367         """
368         assert self.conn is not None
369
370         with self.conn.cursor() as cur:
371             # This finds us the rows in location_postcode and word that are
372             # missing in the other table.
373             cur.execute("""SELECT * FROM
374                             (SELECT pc, word FROM
375                               (SELECT distinct(postcode) as pc FROM location_postcode) p
376                               FULL JOIN
377                               (SELECT word FROM word
378                                 WHERE class ='place' and type = 'postcode') w
379                               ON pc = word) x
380                            WHERE pc is null or word is null""")
381
382             to_delete = []
383             to_add = []
384
385             for postcode, word in cur:
386                 if postcode is None:
387                     to_delete.append(word)
388                 else:
389                     to_add.append(postcode)
390
391             if to_delete:
392                 cur.execute("""DELETE FROM WORD
393                                WHERE class ='place' and type = 'postcode'
394                                      and word = any(%s)
395                             """, (to_delete, ))
396             if to_add:
397                 cur.execute("""SELECT count(create_postcode_id(pc))
398                                FROM unnest(%s) as pc
399                             """, (to_add, ))
400
401
402
403     def update_special_phrases(self, phrases: Iterable[Tuple[str, str, str, str]],
404                                should_replace: bool) -> None:
405         """ Replace the search index for special phrases with the new phrases.
406         """
407         assert self.conn is not None
408
409         norm_phrases = set(((self.normalize(p[0]), p[1], p[2], p[3])
410                             for p in phrases))
411
412         with self.conn.cursor() as cur:
413             # Get the old phrases.
414             existing_phrases = set()
415             cur.execute("""SELECT word, class, type, operator FROM word
416                            WHERE class != 'place'
417                                  OR (type != 'house' AND type != 'postcode')""")
418             for label, cls, typ, oper in cur:
419                 existing_phrases.add((label, cls, typ, oper or '-'))
420
421             to_add = norm_phrases - existing_phrases
422             to_delete = existing_phrases - norm_phrases
423
424             if to_add:
425                 cur.execute_values(
426                     """ INSERT INTO word (word_id, word_token, word, class, type,
427                                           search_name_count, operator)
428                         (SELECT nextval('seq_word'), ' ' || make_standard_name(name), name,
429                                 class, type, 0,
430                                 CASE WHEN op in ('in', 'near') THEN op ELSE null END
431                            FROM (VALUES %s) as v(name, class, type, op))""",
432                     to_add)
433
434             if to_delete and should_replace:
435                 cur.execute_values(
436                     """ DELETE FROM word USING (VALUES %s) as v(name, in_class, in_type, op)
437                         WHERE word = name and class = in_class and type = in_type
438                               and ((op = '-' and operator is null) or op = operator)""",
439                     to_delete)
440
441         LOG.info("Total phrases: %s. Added: %s. Deleted: %s",
442                  len(norm_phrases), len(to_add), len(to_delete))
443
444
445     def add_country_names(self, country_code: str, names: Mapping[str, str]) -> None:
446         """ Add names for the given country to the search index.
447         """
448         assert self.conn is not None
449
450         with self.conn.cursor() as cur:
451             cur.execute(
452                 """INSERT INTO word (word_id, word_token, country_code)
453                    (SELECT nextval('seq_word'), lookup_token, %s
454                       FROM (SELECT DISTINCT ' ' || make_standard_name(n) as lookup_token
455                             FROM unnest(%s)n) y
456                       WHERE NOT EXISTS(SELECT * FROM word
457                                        WHERE word_token = lookup_token and country_code = %s))
458                 """, (country_code, list(names.values()), country_code))
459
460
461     def process_place(self, place: PlaceInfo) -> Mapping[str, Any]:
462         """ Determine tokenizer information about the given place.
463
464             Returns a JSON-serialisable structure that will be handed into
465             the database via the token_info field.
466         """
467         assert self.conn is not None
468
469         token_info = _TokenInfo(self._cache)
470
471         names = place.name
472
473         if names:
474             token_info.add_names(self.conn, names)
475
476             if place.is_country():
477                 assert place.country_code is not None
478                 self.add_country_names(place.country_code, names)
479
480         address = place.address
481         if address:
482             self._process_place_address(token_info, address)
483
484         return token_info.data
485
486
487     def _process_place_address(self, token_info: '_TokenInfo', address: Mapping[str, str]) -> None:
488         assert self.conn is not None
489         hnrs = []
490         addr_terms = []
491
492         for key, value in address.items():
493             if key == 'postcode':
494                 # Make sure the normalized postcode is present in the word table.
495                 if re.search(r'[:,;]', value) is None:
496                     norm_pc = self.normalize_postcode(value)
497                     token_info.set_postcode(norm_pc)
498                     self._cache.add_postcode(self.conn, norm_pc)
499             elif key in ('housenumber', 'streetnumber', 'conscriptionnumber'):
500                 hnrs.append(value)
501             elif key == 'street':
502                 token_info.add_street(self.conn, value)
503             elif key == 'place':
504                 token_info.add_place(self.conn, value)
505             elif not key.startswith('_') \
506                  and key not in ('country', 'full', 'inclusion'):
507                 addr_terms.append((key, value))
508
509         if hnrs:
510             token_info.add_housenumbers(self.conn, hnrs)
511
512         if addr_terms:
513             token_info.add_address_terms(self.conn, addr_terms)
514
515
516
517 class _TokenInfo:
518     """ Collect token information to be sent back to the database.
519     """
520     def __init__(self, cache: '_TokenCache') -> None:
521         self.cache = cache
522         self.data: Dict[str, Any] = {}
523
524
525     def add_names(self, conn: Connection, names: Mapping[str, str]) -> None:
526         """ Add token information for the names of the place.
527         """
528         with conn.cursor() as cur:
529             # Create the token IDs for all names.
530             self.data['names'] = cur.scalar("SELECT make_keywords(%s)::text",
531                                             (names, ))
532
533
534     def add_housenumbers(self, conn: Connection, hnrs: Sequence[str]) -> None:
535         """ Extract housenumber information from the address.
536         """
537         if len(hnrs) == 1:
538             token = self.cache.get_housenumber(hnrs[0])
539             if token is not None:
540                 self.data['hnr_tokens'] = token
541                 self.data['hnr'] = hnrs[0]
542                 return
543
544         # split numbers if necessary
545         simple_list: List[str] = []
546         for hnr in hnrs:
547             simple_list.extend((x.strip() for x in re.split(r'[;,]', hnr)))
548
549         if len(simple_list) > 1:
550             simple_list = list(set(simple_list))
551
552         with conn.cursor() as cur:
553             cur.execute("SELECT * FROM create_housenumbers(%s)", (simple_list, ))
554             result = cur.fetchone()
555             assert result is not None
556             self.data['hnr_tokens'], self.data['hnr'] = result
557
558
559     def set_postcode(self, postcode: str) -> None:
560         """ Set or replace the postcode token with the given value.
561         """
562         self.data['postcode'] = postcode
563
564     def add_street(self, conn: Connection, street: str) -> None:
565         """ Add addr:street match terms.
566         """
567         def _get_street(name: str) -> List[int]:
568             with conn.cursor() as cur:
569                 return cast(List[int],
570                             cur.scalar("SELECT word_ids_from_name(%s)::text", (name, )))
571
572         tokens = self.cache.streets.get(street, _get_street)
573         if tokens:
574             self.data['street'] = tokens
575
576
577     def add_place(self, conn: Connection, place: str) -> None:
578         """ Add addr:place search and match terms.
579         """
580         def _get_place(name: str) -> Tuple[List[int], List[int]]:
581             with conn.cursor() as cur:
582                 cur.execute("""SELECT make_keywords(hstore('name' , %s))::text,
583                                       word_ids_from_name(%s)::text""",
584                             (name, name))
585                 return cast(Tuple[List[int], List[int]], cur.fetchone())
586
587         self.data['place_search'], self.data['place_match'] = \
588             self.cache.places.get(place, _get_place)
589
590
591     def add_address_terms(self, conn: Connection, terms: Sequence[Tuple[str, str]]) -> None:
592         """ Add additional address terms.
593         """
594         def _get_address_term(name: str) -> Tuple[List[int], List[int]]:
595             with conn.cursor() as cur:
596                 cur.execute("""SELECT addr_ids_from_name(%s)::text,
597                                       word_ids_from_name(%s)::text""",
598                             (name, name))
599                 return cast(Tuple[List[int], List[int]], cur.fetchone())
600
601         tokens = {}
602         for key, value in terms:
603             items = self.cache.address_terms.get(value, _get_address_term)
604             if items[0] or items[1]:
605                 tokens[key] = items
606
607         if tokens:
608             self.data['addr'] = tokens
609
610
611 class _LRU:
612     """ Least recently used cache that accepts a generator function to
613         produce the item when there is a cache miss.
614     """
615
616     def __init__(self, maxsize: int = 128):
617         self.data: 'OrderedDict[str, Any]' = OrderedDict()
618         self.maxsize = maxsize
619
620
621     def get(self, key: str, generator: Callable[[str], Any]) -> Any:
622         """ Get the item with the given key from the cache. If nothing
623             is found in the cache, generate the value through the
624             generator function and store it in the cache.
625         """
626         value = self.data.get(key)
627         if value is not None:
628             self.data.move_to_end(key)
629         else:
630             value = generator(key)
631             if len(self.data) >= self.maxsize:
632                 self.data.popitem(last=False)
633             self.data[key] = value
634
635         return value
636
637
638 class _TokenCache:
639     """ Cache for token information to avoid repeated database queries.
640
641         This cache is not thread-safe and needs to be instantiated per
642         analyzer.
643     """
644     def __init__(self, conn: Connection):
645         # various LRU caches
646         self.streets = _LRU(maxsize=256)
647         self.places = _LRU(maxsize=128)
648         self.address_terms = _LRU(maxsize=1024)
649
650         # Lookup houseunumbers up to 100 and cache them
651         with conn.cursor() as cur:
652             cur.execute("""SELECT i, ARRAY[getorcreate_housenumber_id(i::text)]::text
653                            FROM generate_series(1, 100) as i""")
654             self._cached_housenumbers: Dict[str, str] = {str(r[0]): r[1] for r in cur}
655
656         # For postcodes remember the ones that have already been added
657         self.postcodes: Set[str] = set()
658
659     def get_housenumber(self, number: str) -> Optional[str]:
660         """ Get a housenumber token from the cache.
661         """
662         return self._cached_housenumbers.get(number)
663
664
665     def add_postcode(self, conn: Connection, postcode: str) -> None:
666         """ Make sure the given postcode is in the database.
667         """
668         if postcode not in self.postcodes:
669             with conn.cursor() as cur:
670                 cur.execute('SELECT create_postcode_id(%s)', (postcode, ))
671             self.postcodes.add(postcode)