]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tokenizer/legacy_tokenizer.py
factor out connection reset code
[nominatim.git] / nominatim / tokenizer / legacy_tokenizer.py
1 """
2 Tokenizer implementing normalisation as used before Nominatim 4.
3 """
4 from collections import OrderedDict
5 import logging
6 import re
7 import shutil
8 from textwrap import dedent
9
10 from icu import Transliterator
11 import psycopg2
12 import psycopg2.extras
13
14 from nominatim.db.connection import connect
15 from nominatim.db import properties
16 from nominatim.db import utils as db_utils
17 from nominatim.db.sql_preprocessor import SQLPreprocessor
18 from nominatim.errors import UsageError
19
20 DBCFG_NORMALIZATION = "tokenizer_normalization"
21 DBCFG_MAXWORDFREQ = "tokenizer_maxwordfreq"
22
23 LOG = logging.getLogger()
24
25 def create(dsn, data_dir):
26     """ Create a new instance of the tokenizer provided by this module.
27     """
28     return LegacyTokenizer(dsn, data_dir)
29
30
31 def _install_module(config_module_path, src_dir, module_dir):
32     """ Copies the PostgreSQL normalisation module into the project
33         directory if necessary. For historical reasons the module is
34         saved in the '/module' subdirectory and not with the other tokenizer
35         data.
36
37         The function detects when the installation is run from the
38         build directory. It doesn't touch the module in that case.
39     """
40     # Custom module locations are simply used as is.
41     if config_module_path:
42         LOG.info("Using custom path for database module at '%s'", config_module_path)
43         return config_module_path
44
45     # Compatibility mode for builddir installations.
46     if module_dir.exists() and src_dir.samefile(module_dir):
47         LOG.info('Running from build directory. Leaving database module as is.')
48         return module_dir
49
50     # In any other case install the module in the project directory.
51     if not module_dir.exists():
52         module_dir.mkdir()
53
54     destfile = module_dir / 'nominatim.so'
55     shutil.copy(str(src_dir / 'nominatim.so'), str(destfile))
56     destfile.chmod(0o755)
57
58     LOG.info('Database module installed at %s', str(destfile))
59
60     return module_dir
61
62
63 def _check_module(module_dir, conn):
64     """ Try to use the PostgreSQL module to confirm that it is correctly
65         installed and accessible from PostgreSQL.
66     """
67     with conn.cursor() as cur:
68         try:
69             cur.execute("""CREATE FUNCTION nominatim_test_import_func(text)
70                            RETURNS text AS '{}/nominatim.so', 'transliteration'
71                            LANGUAGE c IMMUTABLE STRICT;
72                            DROP FUNCTION nominatim_test_import_func(text)
73                         """.format(module_dir))
74         except psycopg2.DatabaseError as err:
75             LOG.fatal("Error accessing database module: %s", err)
76             raise UsageError("Database module cannot be accessed.") from err
77
78
79 class LegacyTokenizer:
80     """ The legacy tokenizer uses a special PostgreSQL module to normalize
81         names and queries. The tokenizer thus implements normalization through
82         calls to the database.
83     """
84
85     def __init__(self, dsn, data_dir):
86         self.dsn = dsn
87         self.data_dir = data_dir
88         self.normalization = None
89
90
91     def init_new_db(self, config, init_db=True):
92         """ Set up a new tokenizer for the database.
93
94             This copies all necessary data in the project directory to make
95             sure the tokenizer remains stable even over updates.
96         """
97         module_dir = _install_module(config.DATABASE_MODULE_PATH,
98                                      config.lib_dir.module,
99                                      config.project_dir / 'module')
100
101         self.normalization = config.TERM_NORMALIZATION
102
103         self._install_php(config)
104
105         with connect(self.dsn) as conn:
106             _check_module(module_dir, conn)
107             self._save_config(conn, config)
108             conn.commit()
109
110         if init_db:
111             self.update_sql_functions(config)
112             self._init_db_tables(config)
113
114
115     def init_from_project(self):
116         """ Initialise the tokenizer from the project directory.
117         """
118         with connect(self.dsn) as conn:
119             self.normalization = properties.get_property(conn, DBCFG_NORMALIZATION)
120
121
122     def finalize_import(self, config):
123         """ Do any required postprocessing to make the tokenizer data ready
124             for use.
125         """
126         with connect(self.dsn) as conn:
127             sqlp = SQLPreprocessor(conn, config)
128             sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer_indices.sql')
129
130
131     def update_sql_functions(self, config):
132         """ Reimport the SQL functions for this tokenizer.
133         """
134         with connect(self.dsn) as conn:
135             max_word_freq = properties.get_property(conn, DBCFG_MAXWORDFREQ)
136             modulepath = config.DATABASE_MODULE_PATH or \
137                          str((config.project_dir / 'module').resolve())
138             sqlp = SQLPreprocessor(conn, config)
139             sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer.sql',
140                               max_word_freq=max_word_freq,
141                               modulepath=modulepath)
142
143
144     def check_database(self):
145         """ Check that the tokenizer is set up correctly.
146         """
147         hint = """\
148              The Postgresql extension nominatim.so was not correctly loaded.
149
150              Error: {error}
151
152              Hints:
153              * Check the output of the CMmake/make installation step
154              * Does nominatim.so exist?
155              * Does nominatim.so exist on the database server?
156              * Can nominatim.so be accessed by the database user?
157              """
158         with connect(self.dsn) as conn:
159             with conn.cursor() as cur:
160                 try:
161                     out = cur.scalar("SELECT make_standard_name('a')")
162                 except psycopg2.Error as err:
163                     return hint.format(error=str(err))
164
165         if out != 'a':
166             return hint.format(error='Unexpected result for make_standard_name()')
167
168         return None
169
170
171     def migrate_database(self, config):
172         """ Initialise the project directory of an existing database for
173             use with this tokenizer.
174
175             This is a special migration function for updating existing databases
176             to new software versions.
177         """
178         self.normalization = config.TERM_NORMALIZATION
179         module_dir = _install_module(config.DATABASE_MODULE_PATH,
180                                      config.lib_dir.module,
181                                      config.project_dir / 'module')
182
183         with connect(self.dsn) as conn:
184             _check_module(module_dir, conn)
185             self._save_config(conn, config)
186
187
188     def name_analyzer(self):
189         """ Create a new analyzer for tokenizing names and queries
190             using this tokinzer. Analyzers are context managers and should
191             be used accordingly:
192
193             ```
194             with tokenizer.name_analyzer() as analyzer:
195                 analyser.tokenize()
196             ```
197
198             When used outside the with construct, the caller must ensure to
199             call the close() function before destructing the analyzer.
200
201             Analyzers are not thread-safe. You need to instantiate one per thread.
202         """
203         normalizer = Transliterator.createFromRules("phrase normalizer",
204                                                     self.normalization)
205         return LegacyNameAnalyzer(self.dsn, normalizer)
206
207
208     def _install_php(self, config):
209         """ Install the php script for the tokenizer.
210         """
211         php_file = self.data_dir / "tokenizer.php"
212         php_file.write_text(dedent("""\
213             <?php
214             @define('CONST_Max_Word_Frequency', {0.MAX_WORD_FREQUENCY});
215             @define('CONST_Term_Normalization_Rules', "{0.TERM_NORMALIZATION}");
216             require_once('{0.lib_dir.php}/tokenizer/legacy_tokenizer.php');
217             """.format(config)))
218
219
220     def _init_db_tables(self, config):
221         """ Set up the word table and fill it with pre-computed word
222             frequencies.
223         """
224         with connect(self.dsn) as conn:
225             sqlp = SQLPreprocessor(conn, config)
226             sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer_tables.sql')
227             conn.commit()
228
229         LOG.warning("Precomputing word tokens")
230         db_utils.execute_file(self.dsn, config.lib_dir.data / 'words.sql')
231
232
233     def _save_config(self, conn, config):
234         """ Save the configuration that needs to remain stable for the given
235             database as database properties.
236         """
237         properties.set_property(conn, DBCFG_NORMALIZATION, self.normalization)
238         properties.set_property(conn, DBCFG_MAXWORDFREQ, config.MAX_WORD_FREQUENCY)
239
240
241 class LegacyNameAnalyzer:
242     """ The legacy analyzer uses the special Postgresql module for
243         splitting names.
244
245         Each instance opens a connection to the database to request the
246         normalization.
247     """
248
249     def __init__(self, dsn, normalizer):
250         self.conn = connect(dsn).connection
251         self.conn.autocommit = True
252         self.normalizer = normalizer
253         psycopg2.extras.register_hstore(self.conn)
254
255         self._cache = _TokenCache(self.conn)
256
257
258     def __enter__(self):
259         return self
260
261
262     def __exit__(self, exc_type, exc_value, traceback):
263         self.close()
264
265
266     def close(self):
267         """ Free all resources used by the analyzer.
268         """
269         if self.conn:
270             self.conn.close()
271             self.conn = None
272
273
274     def get_word_token_info(self, words):
275         """ Return token information for the given list of words.
276             If a word starts with # it is assumed to be a full name
277             otherwise is a partial name.
278
279             The function returns a list of tuples with
280             (original word, word token, word id).
281
282             The function is used for testing and debugging only
283             and not necessarily efficient.
284         """
285         with self.conn.cursor() as cur:
286             cur.execute("""SELECT t.term, word_token, word_id
287                            FROM word, (SELECT unnest(%s::TEXT[]) as term) t
288                            WHERE word_token = (CASE
289                                    WHEN left(t.term, 1) = '#' THEN
290                                      ' ' || make_standard_name(substring(t.term from 2))
291                                    ELSE
292                                      make_standard_name(t.term)
293                                    END)
294                                  and class is null and country_code is null""",
295                         (words, ))
296
297             return [(r[0], r[1], r[2]) for r in cur]
298
299
300     def normalize(self, phrase):
301         """ Normalize the given phrase, i.e. remove all properties that
302             are irrelevant for search.
303         """
304         return self.normalizer.transliterate(phrase)
305
306
307     @staticmethod
308     def normalize_postcode(postcode):
309         """ Convert the postcode to a standardized form.
310
311             This function must yield exactly the same result as the SQL function
312             'token_normalized_postcode()'.
313         """
314         return postcode.strip().upper()
315
316
317     def update_postcodes_from_db(self):
318         """ Update postcode tokens in the word table from the location_postcode
319             table.
320         """
321         with self.conn.cursor() as cur:
322             # This finds us the rows in location_postcode and word that are
323             # missing in the other table.
324             cur.execute("""SELECT * FROM
325                             (SELECT pc, word FROM
326                               (SELECT distinct(postcode) as pc FROM location_postcode) p
327                               FULL JOIN
328                               (SELECT word FROM word
329                                 WHERE class ='place' and type = 'postcode') w
330                               ON pc = word) x
331                            WHERE pc is null or word is null""")
332
333             to_delete = []
334             to_add = []
335
336             for postcode, word in cur:
337                 if postcode is None:
338                     to_delete.append(word)
339                 else:
340                     to_add.append(postcode)
341
342             if to_delete:
343                 cur.execute("""DELETE FROM WORD
344                                WHERE class ='place' and type = 'postcode'
345                                      and word = any(%s)
346                             """, (to_delete, ))
347             if to_add:
348                 cur.execute("""SELECT count(create_postcode_id(pc))
349                                FROM unnest(%s) as pc
350                             """, (to_add, ))
351
352
353
354     def update_special_phrases(self, phrases, should_replace):
355         """ Replace the search index for special phrases with the new phrases.
356         """
357         norm_phrases = set(((self.normalize(p[0]), p[1], p[2], p[3])
358                             for p in phrases))
359
360         with self.conn.cursor() as cur:
361             # Get the old phrases.
362             existing_phrases = set()
363             cur.execute("""SELECT word, class, type, operator FROM word
364                            WHERE class != 'place'
365                                  OR (type != 'house' AND type != 'postcode')""")
366             for label, cls, typ, oper in cur:
367                 existing_phrases.add((label, cls, typ, oper or '-'))
368
369             to_add = norm_phrases - existing_phrases
370             to_delete = existing_phrases - norm_phrases
371
372             if to_add:
373                 psycopg2.extras.execute_values(
374                     cur,
375                     """ INSERT INTO word (word_id, word_token, word, class, type,
376                                           search_name_count, operator)
377                         (SELECT nextval('seq_word'), ' ' || make_standard_name(name), name,
378                                 class, type, 0,
379                                 CASE WHEN op in ('in', 'near') THEN op ELSE null END
380                            FROM (VALUES %s) as v(name, class, type, op))""",
381                     to_add)
382
383             if to_delete and should_replace:
384                 psycopg2.extras.execute_values(
385                     cur,
386                     """ DELETE FROM word USING (VALUES %s) as v(name, in_class, in_type, op)
387                         WHERE word = name and class = in_class and type = in_type
388                               and ((op = '-' and operator is null) or op = operator)""",
389                     to_delete)
390
391         LOG.info("Total phrases: %s. Added: %s. Deleted: %s",
392                  len(norm_phrases), len(to_add), len(to_delete))
393
394
395     def add_country_names(self, country_code, names):
396         """ Add names for the given country to the search index.
397         """
398         with self.conn.cursor() as cur:
399             cur.execute(
400                 """INSERT INTO word (word_id, word_token, country_code)
401                    (SELECT nextval('seq_word'), lookup_token, %s
402                       FROM (SELECT DISTINCT ' ' || make_standard_name(n) as lookup_token
403                             FROM unnest(%s)n) y
404                       WHERE NOT EXISTS(SELECT * FROM word
405                                        WHERE word_token = lookup_token and country_code = %s))
406                 """, (country_code, list(names.values()), country_code))
407
408
409     def process_place(self, place):
410         """ Determine tokenizer information about the given place.
411
412             Returns a JSON-serialisable structure that will be handed into
413             the database via the token_info field.
414         """
415         token_info = _TokenInfo(self._cache)
416
417         names = place.get('name')
418
419         if names:
420             token_info.add_names(self.conn, names)
421
422             country_feature = place.get('country_feature')
423             if country_feature and re.fullmatch(r'[A-Za-z][A-Za-z]', country_feature):
424                 self.add_country_names(country_feature.lower(), names)
425
426         address = place.get('address')
427         if address:
428             self._process_place_address(token_info, address)
429
430         return token_info.data
431
432
433     def _process_place_address(self, token_info, address):
434         hnrs = []
435         addr_terms = []
436
437         for key, value in address.items():
438             if key == 'postcode':
439                 # Make sure the normalized postcode is present in the word table.
440                 if re.search(r'[:,;]', value) is None:
441                     self._cache.add_postcode(self.conn,
442                                              self.normalize_postcode(value))
443             elif key in ('housenumber', 'streetnumber', 'conscriptionnumber'):
444                 hnrs.append(value)
445             elif key == 'street':
446                 token_info.add_street(self.conn, value)
447             elif key == 'place':
448                 token_info.add_place(self.conn, value)
449             elif not key.startswith('_') and key not in ('country', 'full'):
450                 addr_terms.append((key, value))
451
452         if hnrs:
453             token_info.add_housenumbers(self.conn, hnrs)
454
455         if addr_terms:
456             token_info.add_address_terms(self.conn, addr_terms)
457
458
459
460 class _TokenInfo:
461     """ Collect token information to be sent back to the database.
462     """
463     def __init__(self, cache):
464         self.cache = cache
465         self.data = {}
466
467
468     def add_names(self, conn, names):
469         """ Add token information for the names of the place.
470         """
471         with conn.cursor() as cur:
472             # Create the token IDs for all names.
473             self.data['names'] = cur.scalar("SELECT make_keywords(%s)::text",
474                                             (names, ))
475
476
477     def add_housenumbers(self, conn, hnrs):
478         """ Extract housenumber information from the address.
479         """
480         if len(hnrs) == 1:
481             token = self.cache.get_housenumber(hnrs[0])
482             if token is not None:
483                 self.data['hnr_tokens'] = token
484                 self.data['hnr'] = hnrs[0]
485                 return
486
487         # split numbers if necessary
488         simple_list = []
489         for hnr in hnrs:
490             simple_list.extend((x.strip() for x in re.split(r'[;,]', hnr)))
491
492         if len(simple_list) > 1:
493             simple_list = list(set(simple_list))
494
495         with conn.cursor() as cur:
496             cur.execute("SELECT (create_housenumbers(%s)).* ", (simple_list, ))
497             self.data['hnr_tokens'], self.data['hnr'] = cur.fetchone()
498
499
500     def add_street(self, conn, street):
501         """ Add addr:street match terms.
502         """
503         def _get_street(name):
504             with conn.cursor() as cur:
505                 return cur.scalar("SELECT word_ids_from_name(%s)::text", (name, ))
506
507         self.data['street'] = self.cache.streets.get(street, _get_street)
508
509
510     def add_place(self, conn, place):
511         """ Add addr:place search and match terms.
512         """
513         def _get_place(name):
514             with conn.cursor() as cur:
515                 cur.execute("""SELECT make_keywords(hstore('name' , %s))::text,
516                                       word_ids_from_name(%s)::text""",
517                             (name, name))
518                 return cur.fetchone()
519
520         self.data['place_search'], self.data['place_match'] = \
521             self.cache.places.get(place, _get_place)
522
523
524     def add_address_terms(self, conn, terms):
525         """ Add additional address terms.
526         """
527         def _get_address_term(name):
528             with conn.cursor() as cur:
529                 cur.execute("""SELECT addr_ids_from_name(%s)::text,
530                                       word_ids_from_name(%s)::text""",
531                             (name, name))
532                 return cur.fetchone()
533
534         tokens = {}
535         for key, value in terms:
536             tokens[key] = self.cache.address_terms.get(value, _get_address_term)
537
538         self.data['addr'] = tokens
539
540
541 class _LRU:
542     """ Least recently used cache that accepts a generator function to
543         produce the item when there is a cache miss.
544     """
545
546     def __init__(self, maxsize=128, init_data=None):
547         self.data = init_data or OrderedDict()
548         self.maxsize = maxsize
549         if init_data is not None and len(init_data) > maxsize:
550             self.maxsize = len(init_data)
551
552     def get(self, key, generator):
553         """ Get the item with the given key from the cache. If nothing
554             is found in the cache, generate the value through the
555             generator function and store it in the cache.
556         """
557         value = self.data.get(key)
558         if value is not None:
559             self.data.move_to_end(key)
560         else:
561             value = generator(key)
562             if len(self.data) >= self.maxsize:
563                 self.data.popitem(last=False)
564             self.data[key] = value
565
566         return value
567
568
569 class _TokenCache:
570     """ Cache for token information to avoid repeated database queries.
571
572         This cache is not thread-safe and needs to be instantiated per
573         analyzer.
574     """
575     def __init__(self, conn):
576         # various LRU caches
577         self.streets = _LRU(maxsize=256)
578         self.places = _LRU(maxsize=128)
579         self.address_terms = _LRU(maxsize=1024)
580
581         # Lookup houseunumbers up to 100 and cache them
582         with conn.cursor() as cur:
583             cur.execute("""SELECT i, ARRAY[getorcreate_housenumber_id(i::text)]::text
584                            FROM generate_series(1, 100) as i""")
585             self._cached_housenumbers = {str(r[0]) : r[1] for r in cur}
586
587         # For postcodes remember the ones that have already been added
588         self.postcodes = set()
589
590     def get_housenumber(self, number):
591         """ Get a housenumber token from the cache.
592         """
593         return self._cached_housenumbers.get(number)
594
595
596     def add_postcode(self, conn, postcode):
597         """ Make sure the given postcode is in the database.
598         """
599         if postcode not in self.postcodes:
600             with conn.cursor() as cur:
601                 cur.execute('SELECT create_postcode_id(%s)', (postcode, ))
602             self.postcodes.add(postcode)