import os
import io
import re
+import logging
from tidylib import tidy_document
import xml.etree.ElementTree as ET
import subprocess
from collections import OrderedDict
from nose.tools import * # for assert functions
+logger = logging.getLogger(__name__)
+
BASE_SERVER_ENV = {
'HTTP_HOST' : 'localhost',
'HTTP_USER_AGENT' : 'Mozilla/5.0 (X11; Linux x86_64; rv:51.0) Gecko/20100101 Firefox/51.0',
'HTTP_ACCEPT' : 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
- 'HTTP_ACCEPT_LANGUAGE' : 'en,de;q=0.5',
'HTTP_ACCEPT_ENCODING' : 'gzip, deflate',
'HTTP_CONNECTION' : 'keep-alive',
'SERVER_SIGNATURE' : '<address>Nominatim BDD Tests</address>',
self.result[-1]['address'] = address
-
class ReverseResponse(GenericResponse):
def __init__(self, page, fmt='json', errorcode=200):
"Unknown XML tag %s on page: %s" % (child.tag, self.page)
+class DetailsResponse(GenericResponse):
+
+ def __init__(self, page, fmt='json', errorcode=200):
+ self.page = page
+ self.format = fmt
+ self.errorcode = errorcode
+ self.result = []
+ self.header = dict()
+
+ if errorcode == 200:
+ getattr(self, 'parse_' + fmt)()
+
+ def parse_html(self):
+ content, errors = tidy_document(self.page,
+ options={'char-encoding' : 'utf8'})
+ self.result = {}
+
@when(u'searching for "(?P<query>.*)"(?P<dups> with dups)?')
def query_cmd(context, query, dups):
""" Query directly via PHP script.
for h in context.table.headings:
params[h] = context.table[0][h]
- env = BASE_SERVER_ENV
+ env = dict(BASE_SERVER_ENV)
env['QUERY_STRING'] = urlencode(params)
env['SCRIPT_NAME'] = '/%s.php' % endpoint
'%s.php' % endpoint)
env['NOMINATIM_SETTINGS'] = context.nominatim.local_settings_file
+ logger.debug("Environment:" + json.dumps(env, sort_keys=True, indent=2))
+
+ if hasattr(context, 'http_headers'):
+ env.update(context.http_headers)
+
cmd = ['/usr/bin/php-cgi', env['SCRIPT_FILENAME']]
for k,v in params.items():
cmd.append("%s=%s" % (k, v))
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(outp, err) = proc.communicate()
+ outp = outp.decode('utf-8')
+
+ logger.debug("Result: \n===============================\n"
+ + outp + "\n===============================\n")
assert_equals(0, proc.returncode,
"query.php failed with message: %s\noutput: %s" % (err, outp))
assert_equals(0, len(err), "Unexpected PHP error: %s" % (err))
- outp = outp.decode('utf-8')
-
if outp.startswith('Status: '):
status = int(outp[8:11])
else:
return outp[content_start + 4:], status
+@given(u'the HTTP header')
+def add_http_header(context):
+ if not hasattr(context, 'http_headers'):
+ context.http_headers = {}
+
+ for h in context.table.headings:
+ envvar = 'HTTP_' + h.upper().replace('-', '_')
+ context.http_headers[envvar] = context.table[0][h]
+
@when(u'sending (?P<fmt>\S+ )?search query "(?P<query>.*)"(?P<addr> with address)?')
def website_search_request(context, fmt, query, addr):
-
params = {}
if query:
params['q'] = query
context.response = ReverseResponse(outp, outfmt, status)
+@when(u'sending (?P<fmt>\S+ )?details query for (?P<query>.*)')
+def website_details_request(context, fmt, query):
+ params = {}
+ if query[0] in 'NWR':
+ params['osmtype'] = query[0]
+ params['osmid'] = query[1:]
+ else:
+ params['place_id'] = query
+ outp, status = send_api_query('details', params, fmt, context)
+
+ context.response = DetailsResponse(outp, 'html', status)
+
+@when(u'sending (?P<fmt>\S+ )?lookup query for (?P<query>.*)')
+def website_lookup_request(context, fmt, query):
+ params = { 'osm_ids' : query }
+ outp, status = send_api_query('lookup', params, fmt, context)
+
+ if fmt == 'json ':
+ outfmt = 'json'
+ else:
+ outfmt = 'xml'
+
+ context.response = SearchResponse(outp, outfmt, status)
@step(u'(?P<operator>less than|more than|exactly|at least|at most) (?P<number>\d+) results? (?:is|are) returned')