]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tools/tiger_data.py
use tokenizer during Tiger data import
[nominatim.git] / nominatim / tools / tiger_data.py
1 """
2 Functions for importing tiger data and handling tarbar and directory files
3 """
4 import csv
5 import io
6 import logging
7 import os
8 import tarfile
9
10 import psycopg2.extras
11
12 from nominatim.db.connection import connect
13 from nominatim.db.async_connection import WorkerPool
14 from nominatim.db.sql_preprocessor import SQLPreprocessor
15
16
17 LOG = logging.getLogger()
18
19
20 def handle_tarfile_or_directory(data_dir):
21     """ Handles tarfile or directory for importing tiger data
22     """
23
24     tar = None
25     if data_dir.endswith('.tar.gz'):
26         tar = tarfile.open(data_dir)
27         csv_files = [i for i in tar.getmembers() if i.name.endswith('.csv')]
28         LOG.warning("Found %d CSV files in tarfile with path %s", len(csv_files), data_dir)
29         if not csv_files:
30             LOG.warning("Tiger data import selected but no files in tarfile's path %s", data_dir)
31             return None, None
32     else:
33         files = os.listdir(data_dir)
34         csv_files = [os.path.join(data_dir, i) for i in files if i.endswith('.csv')]
35         LOG.warning("Found %d CSV files in path %s", len(csv_files), data_dir)
36         if not csv_files:
37             LOG.warning("Tiger data import selected but no files found in path %s", data_dir)
38             return None, None
39
40     return csv_files, tar
41
42
43 def handle_threaded_sql_statements(pool, fd, analyzer):
44     """ Handles sql statement with multiplexing
45     """
46     lines = 0
47     # Using pool of database connections to execute sql statements
48
49     sql = "SELECT tiger_line_import(%s, %s, %s, %s, %s, %s)"
50
51     for row in csv.DictReader(fd, delimiter=';'):
52         try:
53             address = dict(street=row['street'], postcode=row['postcode'])
54             args = ('SRID=4326;' + row['geometry'],
55                     int(row['from']), int(row['to']), row['interpolation'],
56                     psycopg2.extras.Json(analyzer.process_place(dict(address=address))),
57                     analyzer.normalize_postcode(row['postcode']))
58         except ValueError:
59             continue
60         pool.next_free_worker().perform(sql, args=args)
61
62         lines += 1
63         if lines == 1000:
64             print('.', end='', flush=True)
65             lines = 0
66
67
68 def add_tiger_data(data_dir, config, threads, tokenizer):
69     """ Import tiger data from directory or tar file `data dir`.
70     """
71     dsn = config.get_libpq_dsn()
72     files, tar = handle_tarfile_or_directory(data_dir)
73
74     if not files:
75         return
76
77     with connect(dsn) as conn:
78         sql = SQLPreprocessor(conn, config)
79         sql.run_sql_file(conn, 'tiger_import_start.sql')
80
81     # Reading files and then for each file line handling
82     # sql_query in <threads - 1> chunks.
83     place_threads = max(1, threads - 1)
84
85     with WorkerPool(dsn, place_threads, ignore_sql_errors=True) as pool:
86         with tokenizer.name_analyzer() as analyzer:
87             for fname in files:
88                 if not tar:
89                     fd = open(fname)
90                 else:
91                     fd = io.TextIOWrapper(tar.extractfile(fname))
92
93                 handle_threaded_sql_statements(pool, fd, analyzer)
94
95                 fd.close()
96
97     if tar:
98         tar.close()
99     print('\n')
100     LOG.warning("Creating indexes on Tiger data")
101     with connect(dsn) as conn:
102         sql = SQLPreprocessor(conn, config)
103         sql.run_sql_file(conn, 'tiger_import_finish.sql')