#!/bin/bash
psqlcmd() {
- psql --quiet wikiprocessingdb
+ psql --quiet wikiprocessingdb |& \
+ grep -v 'does not exist, skipping' |& \
+ grep -v 'violates check constraint' |& \
+ grep -vi 'Failing row contains'
}
mysql2pgsqlcmd() {
- ./mysql2pgsql.perl /dev/stdin /dev/stdout
+ ./mysql2pgsql.perl --nodrop /dev/stdin /dev/stdout
}
download() {
echo "Downloading $1"
- wget --quiet --no-clobber --tries 3
+ wget --quiet --no-clobber --tries=3 "$1"
}
do
echo "Language: $i"
+ # We pre-create the table schema. This allows us to
+ # 1. Skip index creation. Most queries we do are full table scans
+ # 2. Add constrain to only import namespace=0 (wikipedia articles)
+ # Both cuts down data size considerably (50%+)
+
echo "Importing ${i}wiki-latest-pagelinks"
- gzip -dc ${i}wiki-latest-pagelinks.sql.gz | sed "s/\`pagelinks\`/\`${i}pagelinks\`/g" | mysql2pgsqlcmd | psqlcmd
+
+ echo "DROP TABLE IF EXISTS ${i}pagelinks;" | psqlcmd
+ echo "CREATE TABLE ${i}pagelinks (
+ pl_from int NOT NULL DEFAULT '0',
+ pl_namespace int NOT NULL DEFAULT '0',
+ pl_title text NOT NULL DEFAULT '',
+ pl_from_namespace int NOT NULL DEFAULT '0'
+ );" | psqlcmd
+
+ time \
+ gzip -dc ${i}wiki-latest-pagelinks.sql.gz | \
+ sed "s/\`pagelinks\`/\`${i}pagelinks\`/g" | \
+ mysql2pgsqlcmd | \
+ grep -v '^CREATE INDEX ' | \
+ psqlcmd
+
+
+
echo "Importing ${i}wiki-latest-page"
- gzip -dc ${i}wiki-latest-page.sql.gz | sed "s/\`page\`/\`${i}page\`/g" | mysql2pgsqlcmd | psqlcmd
+
+ # autoincrement serial8 4byte
+ echo "DROP TABLE IF EXISTS ${i}page;" | psqlcmd
+ echo "CREATE TABLE ${i}page (
+ page_id int NOT NULL,
+ page_namespace int NOT NULL DEFAULT '0',
+ page_title text NOT NULL DEFAULT '',
+ page_restrictions text NOT NULL,
+ page_is_redirect smallint NOT NULL DEFAULT '0',
+ page_is_new smallint NOT NULL DEFAULT '0',
+ page_random double precision NOT NULL DEFAULT '0',
+ page_touched text NOT NULL DEFAULT '',
+ page_links_updated text DEFAULT NULL,
+ page_latest int NOT NULL DEFAULT '0',
+ page_len int NOT NULL DEFAULT '0',
+ page_content_model text DEFAULT NULL,
+ page_lang text DEFAULT NULL
+ );" | psqlcmd
+
+ time \
+ gzip -dc ${i}wiki-latest-page.sql.gz | \
+ sed "s/\`page\`/\`${i}page\`/g" | \
+ mysql2pgsqlcmd | \
+ grep -v '^CREATE INDEX ' | \
+ psqlcmd
+
+
+
echo "Importing ${i}wiki-latest-langlinks"
- gzip -dc ${i}wiki-latest-langlinks.sql.gz | sed "s/\`langlinks\`/\`${i}langlinks\`/g" | mysql2pgsqlcmd | psqlcmd
+
+ echo "DROP TABLE IF EXISTS ${i}langlinks;" | psqlcmd
+ echo "CREATE TABLE ${i}langlinks (
+ ll_from int NOT NULL DEFAULT '0',
+ ll_lang text NOT NULL DEFAULT '',
+ ll_title text NOT NULL DEFAULT ''
+ );" | psqlcmd
+
+ time \
+ gzip -dc ${i}wiki-latest-langlinks.sql.gz | \
+ sed "s/\`langlinks\`/\`${i}langlinks\`/g" | \
+ mysql2pgsqlcmd | \
+ grep -v '^CREATE INDEX ' | \
+ psqlcmd
+
+
+
+
echo "Importing ${i}wiki-latest-redirect"
- gzip -dc ${i}wiki-latest-redirect.sql.gz | sed "s/\`redirect\`/\`${i}redirect\`/g" | mysql2pgsqlcmd | psqlcmd
+
+ echo "DROP TABLE IF EXISTS ${i}redirect;" | psqlcmd
+ echo "CREATE TABLE ${i}redirect (
+ rd_from int NOT NULL DEFAULT '0',
+ rd_namespace int NOT NULL DEFAULT '0',
+ rd_title text NOT NULL DEFAULT '',
+ rd_interwiki text DEFAULT NULL,
+ rd_fragment text DEFAULT NULL
+ );" | psqlcmd
+
+ time \
+ gzip -dc ${i}wiki-latest-redirect.sql.gz | \
+ sed "s/\`redirect\`/\`${i}redirect\`/g" | \
+ mysql2pgsqlcmd | \
+ grep -v '^CREATE INDEX ' | \
+ psqlcmd
done
echo "CREATE TABLE ${i}pagelinkcount
AS
SELECT pl_title AS title,
- COUNT(*) AS count
+ COUNT(*) AS count,
+ 0::bigint as othercount
FROM ${i}pagelinks
WHERE pl_namespace = 0
GROUP BY pl_title
AND rd_namespace = 0
;" | psqlcmd
- echo "ALTER TABLE ${i}pagelinkcount
- ADD COLUMN othercount integer
- ;" | psqlcmd
+done
- echo "UPDATE ${i}pagelinkcount
- SET othercount = 0
- ;" | psqlcmd
+for i in "${LANGUAGES[@]}"
+do
for j in "${LANGUAGES[@]}"
do
echo "UPDATE ${i}pagelinkcount
echo "DROP TABLE ${i}redirect;" | psqlcmd
echo "DROP TABLE ${i}pagelinkcount;" | psqlcmd
done
+
+echo "all done."
log = logging.getLogger()
def make_connection(options, asynchronous=False):
- return psycopg2.connect(dbname=options.dbname, user=options.user,
- password=options.password, host=options.host,
- port=options.port, async_=asynchronous)
+ params = {'dbname' : options.dbname,
+ 'user' : options.user,
+ 'password' : options.password,
+ 'host' : options.host,
+ 'port' : options.port,
+ 'async' : asynchronous}
+
+ return psycopg2.connect(**params)
class RankRunner(object):
"""
def __init__(self, options):
+ self.current_query = None
+ self.current_params = None
+
+ self.conn = None
+ self.connect()
+
+ def connect(self):
+ if self.conn is not None:
+ self.cursor.close()
+ self.conn.close()
+
self.conn = make_connection(options, asynchronous=True)
self.wait()
self.cursor = self.conn.cursor()
- self.current_query = None
- self.current_params = None
-
def wait(self):
""" Block until any pending operation is done.
"""
- wait_select(self.conn)
- self.current_query = None
+ while True:
+ try:
+ wait_select(self.conn)
+ self.current_query = None
+ return
+ except psycopg2.extensions.TransactionRollbackError as e:
+ if e.pgcode == '40P01':
+ log.info("Deadlock detected (params = {}), retry."
+ .format(self.current_params))
+ self.cursor.execute(self.current_query, self.current_params)
+ else:
+ raise
+ except psycopg2.errors.DeadlockDetected:
+ self.cursor.execute(self.current_query, self.current_params)
def perform(self, sql, args=None):
""" Send SQL query to the server. Returns immediately without
self.cursor.execute(self.current_query, self.current_params)
else:
raise
+ except psycopg2.errors.DeadlockDetected:
+ self.cursor.execute(self.current_query, self.current_params)
return False
sending a query.
"""
ready = self.threads
+ command_stat = 0
while True:
for thread in ready:
if thread.is_done():
+ command_stat += 1
yield thread
- ready, _, _ = select.select(self.threads, [], [])
+ # refresh the connections occasionaly to avoid potential
+ # memory leaks in Postgresql.
+ if command_stat > 100000:
+ for t in self.threads:
+ while not t.is_done():
+ wait_select(t.conn)
+ t.connect()
+ command_stat = 0
+ ready = self.threads
+ else:
+ ready, _, _ = select.select(self.threads, [], [])
assert(False, "Unreachable code")
planet extract. A precompiled PBF with the necessary data can be downloaded from
https://www.nominatim.org/data/test/nominatim-api-testdata.pbf
+You need at least 2GB RAM and 10GB discspace.
+
The polygons defining the extract can be found in the test/testdb
directory. There is also a reduced set of wikipedia data for this extract,
which you need to import as well. For Tiger tests the data of South Dakota
is required. Get the Tiger files `46*`.
-The official test dataset is derived from the 160725 planet. Newer
+ cd Nominatim/data
+ wget https://nominatim.org/data/tiger2018-nominatim-preprocessed.tar.gz
+ tar xvf tiger2018-nominatim-preprocessed.tar.gz --wildcards --no-anchored '46*'
+ rm tiger2018-nominatim-preprocessed.tar.gz
+
+The official test dataset is derived from the 180924 planet. Newer
planets are likely to work as well but you may see isolated test
failures where the data has changed. To recreate the input data
for the test database run: