]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tokenizer/legacy_tokenizer.py
Merge pull request #2903 from lonvia/migration-for-index-reorganization
[nominatim.git] / nominatim / tokenizer / legacy_tokenizer.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tokenizer implementing normalisation as used before Nominatim 4.
9 """
10 from typing import Optional, Sequence, List, Tuple, Mapping, Any, Callable, \
11                    cast, Dict, Set, Iterable
12 from collections import OrderedDict
13 import logging
14 from pathlib import Path
15 import re
16 import shutil
17 from textwrap import dedent
18
19 from icu import Transliterator
20 import psycopg2
21 import psycopg2.extras
22
23 from nominatim.db.connection import connect, Connection
24 from nominatim.config import Configuration
25 from nominatim.db import properties
26 from nominatim.db import utils as db_utils
27 from nominatim.db.sql_preprocessor import SQLPreprocessor
28 from nominatim.data.place_info import PlaceInfo
29 from nominatim.errors import UsageError
30 from nominatim.tokenizer.base import AbstractAnalyzer, AbstractTokenizer
31
32 DBCFG_NORMALIZATION = "tokenizer_normalization"
33 DBCFG_MAXWORDFREQ = "tokenizer_maxwordfreq"
34
35 LOG = logging.getLogger()
36
37 def create(dsn: str, data_dir: Path) -> 'LegacyTokenizer':
38     """ Create a new instance of the tokenizer provided by this module.
39     """
40     return LegacyTokenizer(dsn, data_dir)
41
42
43 def _install_module(config_module_path: str, src_dir: Path, module_dir: Path) -> str:
44     """ Copies the PostgreSQL normalisation module into the project
45         directory if necessary. For historical reasons the module is
46         saved in the '/module' subdirectory and not with the other tokenizer
47         data.
48
49         The function detects when the installation is run from the
50         build directory. It doesn't touch the module in that case.
51     """
52     # Custom module locations are simply used as is.
53     if config_module_path:
54         LOG.info("Using custom path for database module at '%s'", config_module_path)
55         return config_module_path
56
57     # Compatibility mode for builddir installations.
58     if module_dir.exists() and src_dir.samefile(module_dir):
59         LOG.info('Running from build directory. Leaving database module as is.')
60         return str(module_dir)
61
62     # In any other case install the module in the project directory.
63     if not module_dir.exists():
64         module_dir.mkdir()
65
66     destfile = module_dir / 'nominatim.so'
67     shutil.copy(str(src_dir / 'nominatim.so'), str(destfile))
68     destfile.chmod(0o755)
69
70     LOG.info('Database module installed at %s', str(destfile))
71
72     return str(module_dir)
73
74
75 def _check_module(module_dir: str, conn: Connection) -> None:
76     """ Try to use the PostgreSQL module to confirm that it is correctly
77         installed and accessible from PostgreSQL.
78     """
79     with conn.cursor() as cur:
80         try:
81             cur.execute("""CREATE FUNCTION nominatim_test_import_func(text)
82                            RETURNS text AS %s, 'transliteration'
83                            LANGUAGE c IMMUTABLE STRICT;
84                            DROP FUNCTION nominatim_test_import_func(text)
85                         """, (f'{module_dir}/nominatim.so', ))
86         except psycopg2.DatabaseError as err:
87             LOG.fatal("Error accessing database module: %s", err)
88             raise UsageError("Database module cannot be accessed.") from err
89
90
91 class LegacyTokenizer(AbstractTokenizer):
92     """ The legacy tokenizer uses a special PostgreSQL module to normalize
93         names and queries. The tokenizer thus implements normalization through
94         calls to the database.
95     """
96
97     def __init__(self, dsn: str, data_dir: Path) -> None:
98         self.dsn = dsn
99         self.data_dir = data_dir
100         self.normalization: Optional[str] = None
101
102
103     def init_new_db(self, config: Configuration, init_db: bool = True) -> None:
104         """ Set up a new tokenizer for the database.
105
106             This copies all necessary data in the project directory to make
107             sure the tokenizer remains stable even over updates.
108         """
109         module_dir = _install_module(config.DATABASE_MODULE_PATH,
110                                      config.lib_dir.module,
111                                      config.project_dir / 'module')
112
113         self.normalization = config.TERM_NORMALIZATION
114
115         self._install_php(config, overwrite=True)
116
117         with connect(self.dsn) as conn:
118             _check_module(module_dir, conn)
119             self._save_config(conn, config)
120             conn.commit()
121
122         if init_db:
123             self.update_sql_functions(config)
124             self._init_db_tables(config)
125
126
127     def init_from_project(self, config: Configuration) -> None:
128         """ Initialise the tokenizer from the project directory.
129         """
130         with connect(self.dsn) as conn:
131             self.normalization = properties.get_property(conn, DBCFG_NORMALIZATION)
132
133         if not (config.project_dir / 'module' / 'nominatim.so').exists():
134             _install_module(config.DATABASE_MODULE_PATH,
135                             config.lib_dir.module,
136                             config.project_dir / 'module')
137
138         self._install_php(config, overwrite=False)
139
140     def finalize_import(self, config: Configuration) -> None:
141         """ Do any required postprocessing to make the tokenizer data ready
142             for use.
143         """
144         with connect(self.dsn) as conn:
145             sqlp = SQLPreprocessor(conn, config)
146             sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer_indices.sql')
147
148
149     def update_sql_functions(self, config: Configuration) -> None:
150         """ Reimport the SQL functions for this tokenizer.
151         """
152         with connect(self.dsn) as conn:
153             max_word_freq = properties.get_property(conn, DBCFG_MAXWORDFREQ)
154             modulepath = config.DATABASE_MODULE_PATH or \
155                          str((config.project_dir / 'module').resolve())
156             sqlp = SQLPreprocessor(conn, config)
157             sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer.sql',
158                               max_word_freq=max_word_freq,
159                               modulepath=modulepath)
160
161
162     def check_database(self, _: Configuration) -> Optional[str]:
163         """ Check that the tokenizer is set up correctly.
164         """
165         hint = """\
166              The Postgresql extension nominatim.so was not correctly loaded.
167
168              Error: {error}
169
170              Hints:
171              * Check the output of the CMmake/make installation step
172              * Does nominatim.so exist?
173              * Does nominatim.so exist on the database server?
174              * Can nominatim.so be accessed by the database user?
175              """
176         with connect(self.dsn) as conn:
177             with conn.cursor() as cur:
178                 try:
179                     out = cur.scalar("SELECT make_standard_name('a')")
180                 except psycopg2.Error as err:
181                     return hint.format(error=str(err))
182
183         if out != 'a':
184             return hint.format(error='Unexpected result for make_standard_name()')
185
186         return None
187
188
189     def migrate_database(self, config: Configuration) -> None:
190         """ Initialise the project directory of an existing database for
191             use with this tokenizer.
192
193             This is a special migration function for updating existing databases
194             to new software versions.
195         """
196         self.normalization = config.TERM_NORMALIZATION
197         module_dir = _install_module(config.DATABASE_MODULE_PATH,
198                                      config.lib_dir.module,
199                                      config.project_dir / 'module')
200
201         with connect(self.dsn) as conn:
202             _check_module(module_dir, conn)
203             self._save_config(conn, config)
204
205
206     def update_statistics(self) -> None:
207         """ Recompute the frequency of full words.
208         """
209         with connect(self.dsn) as conn:
210             if conn.table_exists('search_name'):
211                 with conn.cursor() as cur:
212                     cur.drop_table("word_frequencies")
213                     LOG.info("Computing word frequencies")
214                     cur.execute("""CREATE TEMP TABLE word_frequencies AS
215                                      SELECT unnest(name_vector) as id, count(*)
216                                      FROM search_name GROUP BY id""")
217                     cur.execute("CREATE INDEX ON word_frequencies(id)")
218                     LOG.info("Update word table with recomputed frequencies")
219                     cur.execute("""UPDATE word SET search_name_count = count
220                                    FROM word_frequencies
221                                    WHERE word_token like ' %' and word_id = id""")
222                     cur.drop_table("word_frequencies")
223             conn.commit()
224
225
226     def update_word_tokens(self) -> None:
227         """ No house-keeping implemented for the legacy tokenizer.
228         """
229         LOG.info("No tokenizer clean-up available.")
230
231
232     def name_analyzer(self) -> 'LegacyNameAnalyzer':
233         """ Create a new analyzer for tokenizing names and queries
234             using this tokinzer. Analyzers are context managers and should
235             be used accordingly:
236
237             ```
238             with tokenizer.name_analyzer() as analyzer:
239                 analyser.tokenize()
240             ```
241
242             When used outside the with construct, the caller must ensure to
243             call the close() function before destructing the analyzer.
244
245             Analyzers are not thread-safe. You need to instantiate one per thread.
246         """
247         normalizer = Transliterator.createFromRules("phrase normalizer",
248                                                     self.normalization)
249         return LegacyNameAnalyzer(self.dsn, normalizer)
250
251
252     def _install_php(self, config: Configuration, overwrite: bool = True) -> None:
253         """ Install the php script for the tokenizer.
254         """
255         php_file = self.data_dir / "tokenizer.php"
256
257         if not php_file.exists() or overwrite:
258             php_file.write_text(dedent(f"""\
259                 <?php
260                 @define('CONST_Max_Word_Frequency', {config.MAX_WORD_FREQUENCY});
261                 @define('CONST_Term_Normalization_Rules', "{config.TERM_NORMALIZATION}");
262                 require_once('{config.lib_dir.php}/tokenizer/legacy_tokenizer.php');
263                 """), encoding='utf-8')
264
265
266     def _init_db_tables(self, config: Configuration) -> None:
267         """ Set up the word table and fill it with pre-computed word
268             frequencies.
269         """
270         with connect(self.dsn) as conn:
271             sqlp = SQLPreprocessor(conn, config)
272             sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer_tables.sql')
273             conn.commit()
274
275         LOG.warning("Precomputing word tokens")
276         db_utils.execute_file(self.dsn, config.lib_dir.data / 'words.sql')
277
278
279     def _save_config(self, conn: Connection, config: Configuration) -> None:
280         """ Save the configuration that needs to remain stable for the given
281             database as database properties.
282         """
283         assert self.normalization is not None
284
285         properties.set_property(conn, DBCFG_NORMALIZATION, self.normalization)
286         properties.set_property(conn, DBCFG_MAXWORDFREQ, config.MAX_WORD_FREQUENCY)
287
288
289 class LegacyNameAnalyzer(AbstractAnalyzer):
290     """ The legacy analyzer uses the special Postgresql module for
291         splitting names.
292
293         Each instance opens a connection to the database to request the
294         normalization.
295     """
296
297     def __init__(self, dsn: str, normalizer: Any):
298         self.conn: Optional[Connection] = connect(dsn).connection
299         self.conn.autocommit = True
300         self.normalizer = normalizer
301         psycopg2.extras.register_hstore(self.conn)
302
303         self._cache = _TokenCache(self.conn)
304
305
306     def close(self) -> None:
307         """ Free all resources used by the analyzer.
308         """
309         if self.conn:
310             self.conn.close()
311             self.conn = None
312
313
314     def get_word_token_info(self, words: Sequence[str]) -> List[Tuple[str, str, int]]:
315         """ Return token information for the given list of words.
316             If a word starts with # it is assumed to be a full name
317             otherwise is a partial name.
318
319             The function returns a list of tuples with
320             (original word, word token, word id).
321
322             The function is used for testing and debugging only
323             and not necessarily efficient.
324         """
325         assert self.conn is not None
326         with self.conn.cursor() as cur:
327             cur.execute("""SELECT t.term, word_token, word_id
328                            FROM word, (SELECT unnest(%s::TEXT[]) as term) t
329                            WHERE word_token = (CASE
330                                    WHEN left(t.term, 1) = '#' THEN
331                                      ' ' || make_standard_name(substring(t.term from 2))
332                                    ELSE
333                                      make_standard_name(t.term)
334                                    END)
335                                  and class is null and country_code is null""",
336                         (words, ))
337
338             return [(r[0], r[1], r[2]) for r in cur]
339
340
341     def normalize(self, phrase: str) -> str:
342         """ Normalize the given phrase, i.e. remove all properties that
343             are irrelevant for search.
344         """
345         return cast(str, self.normalizer.transliterate(phrase))
346
347
348     def normalize_postcode(self, postcode: str) -> str:
349         """ Convert the postcode to a standardized form.
350
351             This function must yield exactly the same result as the SQL function
352             'token_normalized_postcode()'.
353         """
354         return postcode.strip().upper()
355
356
357     def update_postcodes_from_db(self) -> None:
358         """ Update postcode tokens in the word table from the location_postcode
359             table.
360         """
361         assert self.conn is not None
362
363         with self.conn.cursor() as cur:
364             # This finds us the rows in location_postcode and word that are
365             # missing in the other table.
366             cur.execute("""SELECT * FROM
367                             (SELECT pc, word FROM
368                               (SELECT distinct(postcode) as pc FROM location_postcode) p
369                               FULL JOIN
370                               (SELECT word FROM word
371                                 WHERE class ='place' and type = 'postcode') w
372                               ON pc = word) x
373                            WHERE pc is null or word is null""")
374
375             to_delete = []
376             to_add = []
377
378             for postcode, word in cur:
379                 if postcode is None:
380                     to_delete.append(word)
381                 else:
382                     to_add.append(postcode)
383
384             if to_delete:
385                 cur.execute("""DELETE FROM WORD
386                                WHERE class ='place' and type = 'postcode'
387                                      and word = any(%s)
388                             """, (to_delete, ))
389             if to_add:
390                 cur.execute("""SELECT count(create_postcode_id(pc))
391                                FROM unnest(%s) as pc
392                             """, (to_add, ))
393
394
395
396     def update_special_phrases(self, phrases: Iterable[Tuple[str, str, str, str]],
397                                should_replace: bool) -> None:
398         """ Replace the search index for special phrases with the new phrases.
399         """
400         assert self.conn is not None
401
402         norm_phrases = set(((self.normalize(p[0]), p[1], p[2], p[3])
403                             for p in phrases))
404
405         with self.conn.cursor() as cur:
406             # Get the old phrases.
407             existing_phrases = set()
408             cur.execute("""SELECT word, class, type, operator FROM word
409                            WHERE class != 'place'
410                                  OR (type != 'house' AND type != 'postcode')""")
411             for label, cls, typ, oper in cur:
412                 existing_phrases.add((label, cls, typ, oper or '-'))
413
414             to_add = norm_phrases - existing_phrases
415             to_delete = existing_phrases - norm_phrases
416
417             if to_add:
418                 cur.execute_values(
419                     """ INSERT INTO word (word_id, word_token, word, class, type,
420                                           search_name_count, operator)
421                         (SELECT nextval('seq_word'), ' ' || make_standard_name(name), name,
422                                 class, type, 0,
423                                 CASE WHEN op in ('in', 'near') THEN op ELSE null END
424                            FROM (VALUES %s) as v(name, class, type, op))""",
425                     to_add)
426
427             if to_delete and should_replace:
428                 cur.execute_values(
429                     """ DELETE FROM word USING (VALUES %s) as v(name, in_class, in_type, op)
430                         WHERE word = name and class = in_class and type = in_type
431                               and ((op = '-' and operator is null) or op = operator)""",
432                     to_delete)
433
434         LOG.info("Total phrases: %s. Added: %s. Deleted: %s",
435                  len(norm_phrases), len(to_add), len(to_delete))
436
437
438     def add_country_names(self, country_code: str, names: Mapping[str, str]) -> None:
439         """ Add names for the given country to the search index.
440         """
441         assert self.conn is not None
442
443         with self.conn.cursor() as cur:
444             cur.execute(
445                 """INSERT INTO word (word_id, word_token, country_code)
446                    (SELECT nextval('seq_word'), lookup_token, %s
447                       FROM (SELECT DISTINCT ' ' || make_standard_name(n) as lookup_token
448                             FROM unnest(%s)n) y
449                       WHERE NOT EXISTS(SELECT * FROM word
450                                        WHERE word_token = lookup_token and country_code = %s))
451                 """, (country_code, list(names.values()), country_code))
452
453
454     def process_place(self, place: PlaceInfo) -> Mapping[str, Any]:
455         """ Determine tokenizer information about the given place.
456
457             Returns a JSON-serialisable structure that will be handed into
458             the database via the token_info field.
459         """
460         assert self.conn is not None
461
462         token_info = _TokenInfo(self._cache)
463
464         names = place.name
465
466         if names:
467             token_info.add_names(self.conn, names)
468
469             if place.is_country():
470                 assert place.country_code is not None
471                 self.add_country_names(place.country_code, names)
472
473         address = place.address
474         if address:
475             self._process_place_address(token_info, address)
476
477         return token_info.data
478
479
480     def _process_place_address(self, token_info: '_TokenInfo', address: Mapping[str, str]) -> None:
481         assert self.conn is not None
482         hnrs = []
483         addr_terms = []
484
485         for key, value in address.items():
486             if key == 'postcode':
487                 # Make sure the normalized postcode is present in the word table.
488                 if re.search(r'[:,;]', value) is None:
489                     norm_pc = self.normalize_postcode(value)
490                     token_info.set_postcode(norm_pc)
491                     self._cache.add_postcode(self.conn, norm_pc)
492             elif key in ('housenumber', 'streetnumber', 'conscriptionnumber'):
493                 hnrs.append(value)
494             elif key == 'street':
495                 token_info.add_street(self.conn, value)
496             elif key == 'place':
497                 token_info.add_place(self.conn, value)
498             elif not key.startswith('_') \
499                  and key not in ('country', 'full', 'inclusion'):
500                 addr_terms.append((key, value))
501
502         if hnrs:
503             token_info.add_housenumbers(self.conn, hnrs)
504
505         if addr_terms:
506             token_info.add_address_terms(self.conn, addr_terms)
507
508
509
510 class _TokenInfo:
511     """ Collect token information to be sent back to the database.
512     """
513     def __init__(self, cache: '_TokenCache') -> None:
514         self.cache = cache
515         self.data: Dict[str, Any] = {}
516
517
518     def add_names(self, conn: Connection, names: Mapping[str, str]) -> None:
519         """ Add token information for the names of the place.
520         """
521         with conn.cursor() as cur:
522             # Create the token IDs for all names.
523             self.data['names'] = cur.scalar("SELECT make_keywords(%s)::text",
524                                             (names, ))
525
526
527     def add_housenumbers(self, conn: Connection, hnrs: Sequence[str]) -> None:
528         """ Extract housenumber information from the address.
529         """
530         if len(hnrs) == 1:
531             token = self.cache.get_housenumber(hnrs[0])
532             if token is not None:
533                 self.data['hnr_tokens'] = token
534                 self.data['hnr'] = hnrs[0]
535                 return
536
537         # split numbers if necessary
538         simple_list: List[str] = []
539         for hnr in hnrs:
540             simple_list.extend((x.strip() for x in re.split(r'[;,]', hnr)))
541
542         if len(simple_list) > 1:
543             simple_list = list(set(simple_list))
544
545         with conn.cursor() as cur:
546             cur.execute("SELECT * FROM create_housenumbers(%s)", (simple_list, ))
547             result = cur.fetchone()
548             assert result is not None
549             self.data['hnr_tokens'], self.data['hnr'] = result
550
551
552     def set_postcode(self, postcode: str) -> None:
553         """ Set or replace the postcode token with the given value.
554         """
555         self.data['postcode'] = postcode
556
557     def add_street(self, conn: Connection, street: str) -> None:
558         """ Add addr:street match terms.
559         """
560         def _get_street(name: str) -> List[int]:
561             with conn.cursor() as cur:
562                 return cast(List[int],
563                             cur.scalar("SELECT word_ids_from_name(%s)::text", (name, )))
564
565         tokens = self.cache.streets.get(street, _get_street)
566         if tokens:
567             self.data['street'] = tokens
568
569
570     def add_place(self, conn: Connection, place: str) -> None:
571         """ Add addr:place search and match terms.
572         """
573         def _get_place(name: str) -> Tuple[List[int], List[int]]:
574             with conn.cursor() as cur:
575                 cur.execute("""SELECT make_keywords(hstore('name' , %s))::text,
576                                       word_ids_from_name(%s)::text""",
577                             (name, name))
578                 return cast(Tuple[List[int], List[int]], cur.fetchone())
579
580         self.data['place_search'], self.data['place_match'] = \
581             self.cache.places.get(place, _get_place)
582
583
584     def add_address_terms(self, conn: Connection, terms: Sequence[Tuple[str, str]]) -> None:
585         """ Add additional address terms.
586         """
587         def _get_address_term(name: str) -> Tuple[List[int], List[int]]:
588             with conn.cursor() as cur:
589                 cur.execute("""SELECT addr_ids_from_name(%s)::text,
590                                       word_ids_from_name(%s)::text""",
591                             (name, name))
592                 return cast(Tuple[List[int], List[int]], cur.fetchone())
593
594         tokens = {}
595         for key, value in terms:
596             items = self.cache.address_terms.get(value, _get_address_term)
597             if items[0] or items[1]:
598                 tokens[key] = items
599
600         if tokens:
601             self.data['addr'] = tokens
602
603
604 class _LRU:
605     """ Least recently used cache that accepts a generator function to
606         produce the item when there is a cache miss.
607     """
608
609     def __init__(self, maxsize: int = 128):
610         self.data: 'OrderedDict[str, Any]' = OrderedDict()
611         self.maxsize = maxsize
612
613
614     def get(self, key: str, generator: Callable[[str], Any]) -> Any:
615         """ Get the item with the given key from the cache. If nothing
616             is found in the cache, generate the value through the
617             generator function and store it in the cache.
618         """
619         value = self.data.get(key)
620         if value is not None:
621             self.data.move_to_end(key)
622         else:
623             value = generator(key)
624             if len(self.data) >= self.maxsize:
625                 self.data.popitem(last=False)
626             self.data[key] = value
627
628         return value
629
630
631 class _TokenCache:
632     """ Cache for token information to avoid repeated database queries.
633
634         This cache is not thread-safe and needs to be instantiated per
635         analyzer.
636     """
637     def __init__(self, conn: Connection):
638         # various LRU caches
639         self.streets = _LRU(maxsize=256)
640         self.places = _LRU(maxsize=128)
641         self.address_terms = _LRU(maxsize=1024)
642
643         # Lookup houseunumbers up to 100 and cache them
644         with conn.cursor() as cur:
645             cur.execute("""SELECT i, ARRAY[getorcreate_housenumber_id(i::text)]::text
646                            FROM generate_series(1, 100) as i""")
647             self._cached_housenumbers: Dict[str, str] = {str(r[0]): r[1] for r in cur}
648
649         # For postcodes remember the ones that have already been added
650         self.postcodes: Set[str] = set()
651
652     def get_housenumber(self, number: str) -> Optional[str]:
653         """ Get a housenumber token from the cache.
654         """
655         return self._cached_housenumbers.get(number)
656
657
658     def add_postcode(self, conn: Connection, postcode: str) -> None:
659         """ Make sure the given postcode is in the database.
660         """
661         if postcode not in self.postcodes:
662             with conn.cursor() as cur:
663                 cur.execute('SELECT create_postcode_id(%s)', (postcode, ))
664             self.postcodes.add(postcode)