5 def _format_placex_columns(row, force_name):
7 'osm_type' : row['osm'][0],
8 'osm_id' : row['osm'][1:],
9 'admin_level' : row.get('admin_level', 100)
12 for k in ('class', 'type', 'housenumber', 'street',
13 'addr_place', 'isin', 'postcode', 'country_code'):
14 if k in row.headings and row[k]:
17 if 'name' in row.headings:
18 if row['name'].startswith("'"):
19 out['name'] = eval('{' + row['name'] + '}')
21 out['name'] = { 'name' : row['name'] }
23 out['name'] = { 'name' : ''.join(random.choice(string.printable) for _ in range(int(random.random()*30))) }
25 if 'extratags' in row.headings:
26 out['extratags'] = eval('{%s}' % row['extratags'])
31 @given("the (?P<named>named )?places")
32 def add_data_to_place_table(context, named):
33 cur = context.db.cursor()
34 cur.execute('ALTER TABLE place DISABLE TRIGGER place_before_insert')
35 for r in context.table:
36 cols = _format_placex_columns(r, named is not None)
38 if 'geometry' in r.headings:
39 geometry = "'%s'::geometry" % context.osm.make_geometry(r['geometry'])
40 elif cols['osm_type'] == 'N':
41 geometry = "ST_Point(%f, %f)" % (random.random()*360 - 180, random.random()*180 - 90)
43 raise RuntimeError("Missing geometry for place")
45 query = 'INSERT INTO place (%s, geometry) values(%s, ST_SetSRID(%s, 4326))' % (
46 ','.join(cols.keys()),
47 ','.join(['%s' for x in range(len(cols))]),
50 cur.execute(query, list(cols.values()))
51 cur.execute('ALTER TABLE place ENABLE TRIGGER place_before_insert')
57 def import_and_index_data_from_place_table(context):
58 context.nominatim.run_setup_script('create-functions', 'create-partition-functions')
59 cur = context.db.cursor()
61 """insert into placex (osm_type, osm_id, class, type, name, admin_level,
62 housenumber, street, addr_place, isin, postcode, country_code, extratags,
64 select * from place where not (class='place' and type='houses' and osm_type='W')""")
66 """select insert_osmline (osm_id, housenumber, street, addr_place,
67 postcode, country_code, geometry)
68 from place where class='place' and type='houses' and osm_type='W'""")
70 context.nominatim.run_setup_script('index', 'index-noanalyse')
73 @then("table (?P<table>\w+) contains(?P<exact> exactly)?")
74 def check_table_contents(context, table, exact):