- sel = selectors.DefaultSelector()
- # Then copy data from place to placex in <threads - 1> chunks.
- place_threads = max(1, threads - 1)
- for imod in range(place_threads):
- conn = DBConnection(dsn)
- conn.connect()
- conn.perform(
- pysql.SQL("""INSERT INTO placex ({columns})
- SELECT {columns} FROM place
- WHERE osm_id % {total} = {mod}
- AND NOT (class='place' and (type='houses' or type='postcode'))
- AND ST_IsValid(geometry)
- """).format(columns=_COPY_COLUMNS,
- total=pysql.Literal(place_threads),
- mod=pysql.Literal(imod)))
- sel.register(conn, selectors.EVENT_READ, conn)
-
- # Address interpolations go into another table.
- conn = DBConnection(dsn)
- conn.connect()
- conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
- SELECT osm_id, address, geometry FROM place
- WHERE class='place' and type='houses' and osm_type='W'
- and ST_GeometryType(geometry) = 'ST_LineString'
- """)
- sel.register(conn, selectors.EVENT_READ, conn)
-
- # Now wait for all of them to finish.
- todo = place_threads + 1
- while todo > 0:
- for key, _ in sel.select(1):
- conn = key.data
- sel.unregister(conn)
- conn.wait()
- conn.close()
- todo -= 1
+ placex_threads = max(1, threads - 1)
+
+ progress = asyncio.create_task(_progress_print())
+
+ async with QueryPool(dsn, placex_threads + 1) as pool:
+ # Copy data from place to placex in <threads - 1> chunks.
+ for imod in range(placex_threads):
+ await pool.put_query(
+ pysql.SQL("""INSERT INTO placex ({columns})
+ SELECT {columns} FROM place
+ WHERE osm_id % {total} = {mod}
+ AND NOT (class='place'
+ and (type='houses' or type='postcode'))
+ AND ST_IsValid(geometry)
+ """).format(columns=_COPY_COLUMNS,
+ total=pysql.Literal(placex_threads),
+ mod=pysql.Literal(imod)), None)
+
+ # Interpolations need to be copied seperately
+ await pool.put_query("""
+ INSERT INTO location_property_osmline (osm_id, address, linegeo)
+ SELECT osm_id, address, geometry FROM place
+ WHERE class='place' and type='houses' and osm_type='W'
+ and ST_GeometryType(geometry) = 'ST_LineString' """, None)
+
+ progress.cancel()
+
+ async with await psycopg.AsyncConnection.connect(dsn) as aconn:
+ await aconn.execute('ANALYSE')
+
+
+async def _progress_print() -> None:
+ while True:
+ try:
+ await asyncio.sleep(1)
+ except asyncio.CancelledError:
+ print('', flush=True)
+ break