X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/7879ad44cd1e36ae1235c3dcdfd9e477491813e4..9a3bc9cc1e00ffb0e376fe1dd4eda872f967066b:/nominatim/index.c diff --git a/nominatim/index.c b/nominatim/index.c index 253b4f20..c16aba9e 100644 --- a/nominatim/index.c +++ b/nominatim/index.c @@ -21,34 +21,255 @@ extern int verbose; -void nominatim_index(int rank_min, int rank_max, int num_threads, const char *conninfo, const char *structuredoutputfile) +void run_indexing(int rank, int interpolation, PGconn *conn, int num_threads, +struct index_thread_data * thread_data, const char *structuredoutputfile) { - struct index_thread_data * thread_data; - pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER; int tuples, count, sleepcount; - + pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER; + time_t rankStartTime; int rankTotalTuples; int rankCountTuples; float rankPerSecond; - - PGconn *conn; - PGresult * res; + PGresult * resSectors; PGresult * resPlaces; PGresult * resNULL; - - int rank; + int i; int iSector; int iResult; - + const char *paramValues[2]; int paramLengths[2]; int paramFormats[2]; uint32_t paramRank; uint32_t paramSector; uint32_t sector; + + xmlTextWriterPtr writer; + pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER; + + // Create the output file + writer = NULL; + if (structuredoutputfile) + { + writer = nominatim_exportXMLStart(structuredoutputfile); + } + + if (interpolation) + { + fprintf(stderr, "Starting interpolation lines (location_property_osmline)\n"); + } + else + { + fprintf(stderr, "Starting rank %d\n", rank); + } + + rankCountTuples = 0; + rankPerSecond = 0; + + paramRank = PGint32(rank); + paramValues[0] = (char *)¶mRank; + paramLengths[0] = sizeof(paramRank); + paramFormats[0] = 1; + + if (interpolation) + { + resSectors = PQexecPrepared(conn, "index_sectors_osmline", 0, NULL, 0, NULL, 1); + } + else + { + resSectors = PQexecPrepared(conn, "index_sectors", 1, paramValues, paramLengths, paramFormats, 1); + } + if (PQresultStatus(resSectors) != PGRES_TUPLES_OK) + { + fprintf(stderr, "index_sectors: SELECT failed: %s", PQerrorMessage(conn)); + PQclear(resSectors); + exit(EXIT_FAILURE); + } + if (PQftype(resSectors, 0) != PG_OID_INT4) + { + fprintf(stderr, "Sector value has unexpected type\n"); + PQclear(resSectors); + exit(EXIT_FAILURE); + } + if (PQftype(resSectors, 1) != PG_OID_INT8) + { + fprintf(stderr, "Sector value has unexpected type\n"); + PQclear(resSectors); + exit(EXIT_FAILURE); + } + + rankTotalTuples = 0; + for (iSector = 0; iSector < PQntuples(resSectors); iSector++) + { + rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))); + } + + rankStartTime = time(0); + for (iSector = 0; iSector <= PQntuples(resSectors); iSector++) + { + if (iSector > 0) + { + resPlaces = PQgetResult(conn); + if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK) + { + fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn)); + PQclear(resPlaces); + exit(EXIT_FAILURE); + } + if (PQftype(resPlaces, 0) != PG_OID_INT8) + { + fprintf(stderr, "Place_id value has unexpected type\n"); + PQclear(resPlaces); + exit(EXIT_FAILURE); + } + resNULL = PQgetResult(conn); + if (resNULL != NULL) + { + fprintf(stderr, "Unexpected non-null response\n"); + exit(EXIT_FAILURE); + } + } + + if (iSector < PQntuples(resSectors)) + { + sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0))); +// fprintf(stderr, "\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)))); + + // Get all the place_id's for this sector + paramRank = PGint32(rank); + paramSector = PGint32(sector); + if (rankTotalTuples-rankCountTuples < num_threads*1000) + { + // no sectors + if (interpolation) + { + iResult = PQsendQueryPrepared(conn, "index_nosector_places_osmline", 0, NULL, 0, NULL, 1); + } + else + { + paramValues[0] = (char *)¶mRank; + paramLengths[0] = sizeof(paramRank); + paramFormats[0] = 1; + iResult = PQsendQueryPrepared(conn, "index_nosector_places", 1, paramValues, paramLengths, paramFormats, 1); + } + } + else + { + if (interpolation) + { + iResult = PQsendQueryPrepared(conn, "index_sector_places_osmline", 1, paramValues, paramLengths, paramFormats, 1); + paramValues[0] = (char *)¶mSector; + paramLengths[0] = sizeof(paramSector); + paramFormats[0] = 1; + } + else + { + paramValues[0] = (char *)¶mRank; + paramLengths[0] = sizeof(paramRank); + paramFormats[0] = 1; + paramValues[1] = (char *)¶mSector; + paramLengths[1] = sizeof(paramSector); + paramFormats[1] = 1; + iResult = PQsendQueryPrepared(conn, "index_sector_places", 2, paramValues, paramLengths, paramFormats, 1); + } + } + if (!iResult) + { + fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn)); + PQclear(resPlaces); + exit(EXIT_FAILURE); + } + } + if (iSector > 0) + { + count = 0; + rankPerSecond = 0; + tuples = PQntuples(resPlaces); + + if (tuples > 0) + { + // Spawn threads + for (i = 0; i < num_threads; i++) + { + thread_data[i].res = resPlaces; + thread_data[i].tuples = tuples; + thread_data[i].count = &count; + thread_data[i].count_mutex = &count_mutex; + thread_data[i].writer = writer; + thread_data[i].writer_mutex = &writer_mutex; + if (interpolation) + { + thread_data[i].table = 0; // use interpolations table + } + else + { + thread_data[i].table = 1; // use placex table + } + pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]); + } + + // Monitor threads to give user feedback + sleepcount = 0; + while (count < tuples) + { + usleep(1000); + + // Aim for one update per second + if (sleepcount++ > 1000) + { + rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1); + if(interpolation) + { + fprintf(stderr, " Done %i in %i @ %f per second - Interpolation lines ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond); + } + else + { + fprintf(stderr, " Done %i in %i @ %f per second - Rank %i ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, rank, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond); + } + + sleepcount = 0; + } + } + + // Wait for everything to finish + for (i = 0; i < num_threads; i++) + { + pthread_join(thread_data[i].thread, NULL); + } + + rankCountTuples += tuples; + } + + // Finished sector + rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1); + fprintf(stderr, " Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond); + + PQclear(resPlaces); + } + if (rankTotalTuples-rankCountTuples < num_threads*20 && iSector < PQntuples(resSectors)) + { + iSector = PQntuples(resSectors) - 1; + } + } + // Finished rank + fprintf(stderr, "\r Done %i in %i @ %f per second - FINISHED\n\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond); + + PQclear(resSectors); +} + +void nominatim_index(int rank_min, int rank_max, int num_threads, const char *conninfo, const char *structuredoutputfile) +{ + struct index_thread_data * thread_data; + + PGconn *conn; + PGresult * res; + + int rank; + + int i; xmlTextWriterPtr writer; pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -155,7 +376,7 @@ void nominatim_index(int rank_min, int rank_max, int num_threads, const char *co 1, pg_prepare_params); if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "Failed preparing index_placex: %s\n", PQerrorMessage(conn)); + fprintf(stderr, "Failed preparing index_placex: %s\n", PQerrorMessage(thread_data[i].conn)); exit(EXIT_FAILURE); } PQclear(res); @@ -166,348 +387,42 @@ void nominatim_index(int rank_min, int rank_max, int num_threads, const char *co 1, pg_prepare_params); if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "Failed preparing index_osmline: %s\n", PQerrorMessage(conn)); + fprintf(stderr, "Failed preparing index_osmline: %s\n", PQerrorMessage(thread_data[i].conn)); exit(EXIT_FAILURE); } PQclear(res); - /*res = PQexec(thread_data[i].conn, "set enable_seqscan = false"); + // Make sure the error message is not localized as we parse it later. + res = PQexec(thread_data[i].conn, "SET lc_messages TO 'C'"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "Failed disabling sequential scan: %s\n", PQerrorMessage(conn)); + fprintf(stderr, "Failed to set langauge: %s\n", PQerrorMessage(thread_data[i].conn)); exit(EXIT_FAILURE); } - PQclear(res);*/ + PQclear(res); nominatim_exportCreatePreparedQueries(thread_data[i].conn); } - // Create the output file - writer = NULL; - if (structuredoutputfile) - { - writer = nominatim_exportXMLStart(structuredoutputfile); - } fprintf(stderr, "Starting indexing rank (%i to %i) using %i threads\n", rank_min, rank_max, num_threads); - // first for the placex table for (rank = rank_min; rank <= rank_max; rank++) { // OSMLINE: do reindexing (=> reparenting) for interpolation lines at rank 30, but before all other objects of rank 30 // reason: houses (rank 30) depend on the updated interpolation line, when reparenting (see placex_update in functions.sql) if (rank == 30) { - fprintf(stderr, "Starting indexing interpolation lines (location_property_osmline)\n"); - rankCountTuples = 0; - rankTotalTuples = 0; - resSectors = PQexecPrepared(conn, "index_sectors_osmline", 0, NULL, 0, NULL, 1); - if (PQresultStatus(resSectors) != PGRES_TUPLES_OK) - { - fprintf(stderr, "index_sectors_osmline: SELECT failed: %s", PQerrorMessage(conn)); - PQclear(resSectors); - exit(EXIT_FAILURE); - } - if (PQftype(resSectors, 0) != PG_OID_INT4) - { - fprintf(stderr, "Sector value has unexpected type\n"); - PQclear(resSectors); - exit(EXIT_FAILURE); - } - if (PQftype(resSectors, 1) != PG_OID_INT8) - { - fprintf(stderr, "Sector value has unexpected type\n"); - PQclear(resSectors); - exit(EXIT_FAILURE); - } - rankStartTime = time(0); - for (iSector = 0; iSector < PQntuples(resSectors); iSector++) - { - rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))); - } - // do it only if tuples with indexed_status > 0 were found in osmline - int nTuples = PQntuples(resSectors); - if (nTuples > 0) - { - for (iSector = 0; iSector <= nTuples; iSector++) - { - if (iSector > 0) - { - resPlaces = PQgetResult(conn); - if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK) - { - fprintf(stderr, "index_sector_places: SELECT failed: %s\n", PQerrorMessage(conn)); - PQclear(resPlaces); - exit(EXIT_FAILURE); - } - if (PQftype(resPlaces, 0) != PG_OID_INT8) - { - fprintf(stderr, "Place_id value has unexpected type\n"); - PQclear(resPlaces); - exit(EXIT_FAILURE); - } - resNULL = PQgetResult(conn); - if (resNULL != NULL) - { - fprintf(stderr, "Unexpected non-null response\n"); - exit(EXIT_FAILURE); - } - } - - if (iSector < nTuples) - { - sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0))); - // fprintf(stderr, "\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)))); - - // Get all the place_id's for this sector - paramSector = PGint32(sector); - paramValues[0] = (char *)¶mSector; - paramLengths[0] = sizeof(paramSector); - paramFormats[0] = 1; - if (rankTotalTuples-rankCountTuples < num_threads*1000) - { - // no sectors - iResult = PQsendQueryPrepared(conn, "index_nosector_places_osmline", 0, NULL, 0, NULL, 1); - } - else - { - iResult = PQsendQueryPrepared(conn, "index_sector_places_osmline", 1, paramValues, paramLengths, paramFormats, 1); - } - if (!iResult) - { - fprintf(stderr, "index_sector_places_osmline: SELECT failed: %s", PQerrorMessage(conn)); - PQclear(resPlaces); - exit(EXIT_FAILURE); - } - } - if (iSector > 0) - { - count = 0; - rankPerSecond = 0; - tuples = PQntuples(resPlaces); - - if (tuples > 0) - { - // Spawn threads - for (i = 0; i < num_threads; i++) - { - thread_data[i].res = resPlaces; - thread_data[i].tuples = tuples; - thread_data[i].count = &count; - thread_data[i].count_mutex = &count_mutex; - thread_data[i].writer = writer; - thread_data[i].writer_mutex = &writer_mutex; - thread_data[i].table = 0; // use osmline table - pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]); - } - // Monitor threads to give user feedback - sleepcount = 0; - while (count < tuples) - { - usleep(1000); - - // Aim for one update per second - if (sleepcount++ > 500) - { - rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1); - fprintf(stderr, " Done %i in %i @ %f per second - Interpolation Lines ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - (rankCountTuples + count)))/(float)rankPerSecond); - sleepcount = 0; - } - } - - // Wait for everything to finish - for (i = 0; i < num_threads; i++) - { - pthread_join(thread_data[i].thread, NULL); - } - rankCountTuples += tuples; - } - // Finished sector - rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1); - fprintf(stderr, " Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond); - PQclear(resPlaces); - } - if (rankTotalTuples-rankCountTuples < num_threads*20 && iSector < nTuples) - { - iSector = nTuples - 1; - } - } - PQclear(resSectors); - } - // Finished rank - fprintf(stderr, "\r Done %i tuples in %i seconds- FINISHED\n", rankCountTuples,(int)(difftime(time(0), rankStartTime))); - if (writer) - { - nominatim_exportXMLEnd(writer); - } - } - fprintf(stderr, "Starting rank %d\n", rank); - rankCountTuples = 0; - rankPerSecond = 0; - - paramRank = PGint32(rank); - paramValues[0] = (char *)¶mRank; - paramLengths[0] = sizeof(paramRank); - paramFormats[0] = 1; -// if (rank < 16) -// resSectors = PQexecPrepared(conn, "index_nosectors", 1, paramValues, paramLengths, paramFormats, 1); -// else - resSectors = PQexecPrepared(conn, "index_sectors", 1, paramValues, paramLengths, paramFormats, 1); - - if (PQresultStatus(resSectors) != PGRES_TUPLES_OK) - { - fprintf(stderr, "index_sectors: SELECT failed: %s", PQerrorMessage(conn)); - PQclear(resSectors); - exit(EXIT_FAILURE); - } - if (PQftype(resSectors, 0) != PG_OID_INT4) - { - fprintf(stderr, "Sector value has unexpected type\n"); - PQclear(resSectors); - exit(EXIT_FAILURE); - } - if (PQftype(resSectors, 1) != PG_OID_INT8) - { - fprintf(stderr, "Sector value has unexpected type\n"); - PQclear(resSectors); - exit(EXIT_FAILURE); - } - - rankTotalTuples = 0; - for (iSector = 0; iSector < PQntuples(resSectors); iSector++) - { - rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))); - } - - rankStartTime = time(0); - - for (iSector = 0; iSector <= PQntuples(resSectors); iSector++) - { - if (iSector > 0) - { - resPlaces = PQgetResult(conn); - if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK) - { - fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn)); - PQclear(resPlaces); - exit(EXIT_FAILURE); - } - if (PQftype(resPlaces, 0) != PG_OID_INT8) - { - fprintf(stderr, "Place_id value has unexpected type\n"); - PQclear(resPlaces); - exit(EXIT_FAILURE); - } - resNULL = PQgetResult(conn); - if (resNULL != NULL) - { - fprintf(stderr, "Unexpected non-null response\n"); - exit(EXIT_FAILURE); - } - } - - if (iSector < PQntuples(resSectors)) - { - sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0))); -// fprintf(stderr, "\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)))); - - // Get all the place_id's for this sector - paramRank = PGint32(rank); - paramValues[0] = (char *)¶mRank; - paramLengths[0] = sizeof(paramRank); - paramFormats[0] = 1; - paramSector = PGint32(sector); - paramValues[1] = (char *)¶mSector; - paramLengths[1] = sizeof(paramSector); - paramFormats[1] = 1; - if (rankTotalTuples-rankCountTuples < num_threads*1000) - { - iResult = PQsendQueryPrepared(conn, "index_nosector_places", 1, paramValues, paramLengths, paramFormats, 1); - } - else - { - iResult = PQsendQueryPrepared(conn, "index_sector_places", 2, paramValues, paramLengths, paramFormats, 1); - } - if (!iResult) - { - fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn)); - PQclear(resPlaces); - exit(EXIT_FAILURE); - } - } - - if (iSector > 0) - { - count = 0; - rankPerSecond = 0; - tuples = PQntuples(resPlaces); - - if (tuples > 0) - { - // Spawn threads - for (i = 0; i < num_threads; i++) - { - thread_data[i].res = resPlaces; - thread_data[i].tuples = tuples; - thread_data[i].count = &count; - thread_data[i].count_mutex = &count_mutex; - thread_data[i].writer = writer; - thread_data[i].writer_mutex = &writer_mutex; - thread_data[i].table = 1; // use placex table - pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]); - } - - // Monitor threads to give user feedback - sleepcount = 0; - while (count < tuples) - { - usleep(1000); - - // Aim for one update per second - if (sleepcount++ > 500) - { - rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1); - fprintf(stderr, " Done %i in %i @ %f per second - Rank %i ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, rank, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond); - sleepcount = 0; - } - } - - // Wait for everything to finish - for (i = 0; i < num_threads; i++) - { - pthread_join(thread_data[i].thread, NULL); - } - - rankCountTuples += tuples; - } - - // Finished sector - rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1); - fprintf(stderr, " Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond); - - PQclear(resPlaces); - } - if (rankTotalTuples-rankCountTuples < num_threads*20 && iSector < PQntuples(resSectors)) - { - iSector = PQntuples(resSectors) - 1; - } - } - // Finished rank - fprintf(stderr, "\r Done %i in %i @ %f per second - FINISHED \n\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond); - - PQclear(resSectors); - } - - - if (rank_max == 30) - { - // Close all connections - for (i = 0; i < num_threads; i++) - { - PQfinish(thread_data[i].conn); + run_indexing(rank, 1, conn, num_threads, thread_data, structuredoutputfile); } - PQfinish(conn); + run_indexing(rank, 0, conn, num_threads, thread_data, structuredoutputfile); } + // Close all connections + for (i = 0; i < num_threads; i++) + { + PQfinish(thread_data[i].conn); + } + PQfinish(conn); } void *nominatim_indexThread(void * thread_data_in) @@ -523,9 +438,9 @@ void *nominatim_indexThread(void * thread_data_in) uint64_t paramPlaceID; uint64_t place_id; time_t updateStartTime; - uint table; + unsigned table; - table = (uint)(thread_data->table); + table = thread_data->table; while (1) {