+def load_data(dsn: str, threads: int) -> None:
+ """ Copy data into the word and placex table.
+ """
+ sel = selectors.DefaultSelector()
+ # Then copy data from place to placex in <threads - 1> chunks.
+ place_threads = max(1, threads - 1)
+ for imod in range(place_threads):
+ conn = DBConnection(dsn)
+ conn.connect()
+ conn.perform(
+ pysql.SQL("""INSERT INTO placex ({columns})
+ SELECT {columns} FROM place
+ WHERE osm_id % {total} = {mod}
+ AND NOT (class='place' and (type='houses' or type='postcode'))
+ AND ST_IsValid(geometry)
+ """).format(columns=_COPY_COLUMNS,
+ total=pysql.Literal(place_threads),
+ mod=pysql.Literal(imod)))
+ sel.register(conn, selectors.EVENT_READ, conn)
+
+ # Address interpolations go into another table.
+ conn = DBConnection(dsn)
+ conn.connect()
+ conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
+ SELECT osm_id, address, geometry FROM place
+ WHERE class='place' and type='houses' and osm_type='W'
+ and ST_GeometryType(geometry) = 'ST_LineString'
+ """)
+ sel.register(conn, selectors.EVENT_READ, conn)
+
+ # Now wait for all of them to finish.
+ todo = place_threads + 1
+ while todo > 0:
+ for key, _ in sel.select(1):
+ conn = key.data
+ sel.unregister(conn)
+ conn.wait()
+ conn.close()
+ todo -= 1
+ print('.', end='', flush=True)
+ print('\n')
+
+ with connect(dsn) as syn_conn:
+ with syn_conn.cursor() as cur:
+ cur.execute('ANALYSE')
+
+
+def create_search_indices(conn: Connection, config: Configuration,
+ drop: bool = False, threads: int = 1) -> None:
+ """ Create tables that have explicit partitioning.
+ """
+
+ # If index creation failed and left an index invalid, they need to be
+ # cleaned out first, so that the script recreates them.
+ with conn.cursor() as cur:
+ cur.execute("""SELECT relname FROM pg_class, pg_index
+ WHERE pg_index.indisvalid = false
+ AND pg_index.indexrelid = pg_class.oid""")
+ bad_indices = [row[0] for row in list(cur)]
+ for idx in bad_indices:
+ LOG.info("Drop invalid index %s.", idx)
+ cur.execute(pysql.SQL('DROP INDEX {}').format(pysql.Identifier(idx)))
+ conn.commit()
+
+ sql = SQLPreprocessor(conn, config)
+
+ sql.run_parallel_sql_file(config.get_libpq_dsn(),
+ 'indices.sql', min(8, threads), drop=drop)