--- /dev/null
+@DB
+Feature: Import of simple objects
+ Testing simple stuff
+
+ @wip
+ Scenario: Import place node
+ Given the places
+ | osm | class | type | name | geometry |
+ | N1 | place | village | 'name' : 'Foo' | 10.0 -10.0 |
+ And the named places
+ | osm | class | type | housenumber |
+ | N2 | place | village | |
+ When importing
+ Then table placex contains
+ | object | class | type | name | centroid |
+ | N1 | place | village | 'name' : 'Foo' | 10.0,-10.0 +- 1m |
+ When sending query "Foo"
+ Then results contain
+ | ID | osm_type | osm_id |
+ | 0 | N | 1 |
'TEST_SETTINGS_FILE' : '/tmp/nominatim_settings.php'
}
+use_step_matcher("re")
+
class NominatimEnvironment(object):
""" Collects all functions for the execution of Nominatim functions.
"""
self.scene_path = os.environ.get('SCENE_PATH',
os.path.join(scriptpath, '..', 'scenes', 'data'))
+ def make_geometry(self, geom):
+ if geom.find(',') < 0:
+ return 'POINT(%s)' % geom
+
+ if geom.find('(') < 0:
+ return 'LINESTRING(%s)' % geom
+
+ return 'POLYGON(%s)' % geom
+
def before_all(context):
- for k,v in userconfig.items():
- context.config.userdata.setdefault(k, v)
- print('config:', context.config.userdata)
# logging setup
context.config.setup_logging()
+ # set up -D options
+ for k,v in userconfig.items():
+ context.config.userdata.setdefault(k, v)
+ logging.debug('User config: %s' %(str(context.config.userdata)))
# Nominatim test setup
context.nominatim = NominatimEnvironment(context.config.userdata)
context.osm = OSMDataFactory()
--- /dev/null
+import base64
+import random
+import string
+
+def _format_placex_columns(row, force_name):
+ out = {
+ 'osm_type' : row['osm'][0],
+ 'osm_id' : row['osm'][1:],
+ 'admin_level' : row.get('admin_level', 100)
+ }
+
+ for k in ('class', 'type', 'housenumber', 'street',
+ 'addr_place', 'isin', 'postcode', 'country_code'):
+ if k in row.headings and row[k]:
+ out[k] = row[k]
+
+ if 'name' in row.headings:
+ if row['name'].startswith("'"):
+ out['name'] = eval('{' + row['name'] + '}')
+ else:
+ out['name'] = { 'name' : row['name'] }
+ elif force_name:
+ out['name'] = { 'name' : ''.join(random.choice(string.printable) for _ in range(int(random.random()*30))) }
+
+ if 'extratags' in row.headings:
+ out['extratags'] = eval('{%s}' % row['extratags'])
+
+ return out
+
+
+@given("the (?P<named>named )?places")
+def add_data_to_place_table(context, named):
+ cur = context.db.cursor()
+ cur.execute('ALTER TABLE place DISABLE TRIGGER place_before_insert')
+ for r in context.table:
+ cols = _format_placex_columns(r, named is not None)
+
+ if 'geometry' in r.headings:
+ geometry = "'%s'::geometry" % context.osm.make_geometry(r['geometry'])
+ elif cols['osm_type'] == 'N':
+ geometry = "ST_Point(%f, %f)" % (random.random()*360 - 180, random.random()*180 - 90)
+ else:
+ raise RuntimeError("Missing geometry for place")
+
+ query = 'INSERT INTO place (%s, geometry) values(%s, ST_SetSRID(%s, 4326))' % (
+ ','.join(cols.keys()),
+ ','.join(['%s' for x in range(len(cols))]),
+ geometry
+ )
+ cur.execute(query, list(cols.values()))
+ cur.execute('ALTER TABLE place ENABLE TRIGGER place_before_insert')
+ cur.close()
+ context.db.commit()
+
+
+@when("importing")
+def import_and_index_data_from_place_table(context):
+ context.nominatim.run_setup_script('create-functions', 'create-partition-functions')
+ cur = context.db.cursor()
+ cur.execute(
+ """insert into placex (osm_type, osm_id, class, type, name, admin_level,
+ housenumber, street, addr_place, isin, postcode, country_code, extratags,
+ geometry)
+ select * from place where not (class='place' and type='houses' and osm_type='W')""")
+ cur.execute(
+ """select insert_osmline (osm_id, housenumber, street, addr_place,
+ postcode, country_code, geometry)
+ from place where class='place' and type='houses' and osm_type='W'""")
+ context.db.commit()
+ context.nominatim.run_setup_script('index', 'index-noanalyse')
+
+
+@then("table (?P<table>\w+) contains(?P<exact> exactly)?")
+def check_table_contents(context, table, exact):
+ pass
--- /dev/null
+""" Steps that run search queries.
+
+ Queries may either be run directly via PHP using the query script
+ or via the HTTP interface.
+"""
+
+import os
+import subprocess
+
+class SearchResponse(object):
+
+ def __init__(response,
+
+@when(u'searching for "(?P<query>.*)"( with params)?$')
+def query_cmd(context, query):
+ """ Query directly via PHP script.
+ """
+ cmd = [os.path.join(context.nominatim.build_dir, 'utils', 'query.php'),
+ '--search', query]
+ # add more parameters in table form
+ if context.table:
+ for h in context.table.headings:
+ value = context.table[0][h].strip()
+ if value:
+ cmd.extend(('--' + h, value))
+
+ proc = subprocess.Popen(cmd, cwd=context.nominatim.build_dir,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ (outp, err) = proc.communicate()
+
+ assert_equals (0, proc.returncode), "query.php failed with message: %s" % err
+
+ context.
+ world.page = outp
+ world.response_format = 'json'
+ world.request_type = 'search'
+ world.returncode = 200
+