2 * triggers indexing (reparenting etc.) through setting resetting indexed_status: update placex/osmline set indexed_status = 0 where indexed_status > 0
3 * triggers placex_update and osmline_update
17 #include "nominatim.h"
20 #include "postgresql.h"
24 void nominatim_index(int rank_min, int rank_max, int num_threads, const char *conninfo, const char *structuredoutputfile)
26 struct index_thread_data * thread_data;
27 pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
28 int tuples, count, sleepcount;
37 PGresult * resSectors;
46 const char *paramValues[2];
53 xmlTextWriterPtr writer;
54 pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
56 Oid pg_prepare_params[2];
58 conn = PQconnectdb(conninfo);
59 if (PQstatus(conn) != CONNECTION_OK)
61 fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
65 pg_prepare_params[0] = PG_OID_INT4;
66 res = PQprepare(conn, "index_sectors",
67 "select geometry_sector,count(*) from placex where rank_search = $1 and indexed_status > 0 group by geometry_sector order by geometry_sector",
68 1, pg_prepare_params);
69 if (PQresultStatus(res) != PGRES_COMMAND_OK)
71 fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
76 res = PQprepare(conn, "index_sectors_osmline",
77 "select geometry_sector,count(*) from location_property_osmline where indexed_status > 0 group by geometry_sector order by geometry_sector",
79 if (PQresultStatus(res) != PGRES_COMMAND_OK)
81 fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
86 pg_prepare_params[0] = PG_OID_INT4;
87 res = PQprepare(conn, "index_nosectors",
88 "select 0::integer,count(*) from placex where rank_search = $1 and indexed_status > 0",
89 1, pg_prepare_params);
90 if (PQresultStatus(res) != PGRES_COMMAND_OK)
92 fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
97 pg_prepare_params[0] = PG_OID_INT4;
98 pg_prepare_params[1] = PG_OID_INT4;
99 res = PQprepare(conn, "index_sector_places",
100 "select place_id from placex where rank_search = $1 and geometry_sector = $2 and indexed_status > 0",
101 2, pg_prepare_params);
102 if (PQresultStatus(res) != PGRES_COMMAND_OK)
104 fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
109 pg_prepare_params[0] = PG_OID_INT4;
110 res = PQprepare(conn, "index_nosector_places",
111 "select place_id from placex where rank_search = $1 and indexed_status > 0 order by geometry_sector",
112 1, pg_prepare_params);
113 if (PQresultStatus(res) != PGRES_COMMAND_OK)
115 fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
120 pg_prepare_params[0] = PG_OID_INT4;
121 res = PQprepare(conn, "index_sector_places_osmline",
122 "select place_id from location_property_osmline where geometry_sector = $1 and indexed_status > 0",
123 1, pg_prepare_params);
124 if (PQresultStatus(res) != PGRES_COMMAND_OK)
126 fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
131 res = PQprepare(conn, "index_nosector_places_osmline",
132 "select place_id from location_property_osmline where indexed_status > 0 order by geometry_sector",
134 if (PQresultStatus(res) != PGRES_COMMAND_OK)
136 fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
141 // Build the data for each thread
142 thread_data = (struct index_thread_data *)malloc(sizeof(struct index_thread_data)*num_threads);
143 for (i = 0; i < num_threads; i++)
145 thread_data[i].conn = PQconnectdb(conninfo);
146 if (PQstatus(thread_data[i].conn) != CONNECTION_OK)
148 fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(thread_data[i].conn));
152 pg_prepare_params[0] = PG_OID_INT8;
153 res = PQprepare(thread_data[i].conn, "index_placex",
154 "update placex set indexed_status = 0 where place_id = $1",
155 1, pg_prepare_params);
156 if (PQresultStatus(res) != PGRES_COMMAND_OK)
158 fprintf(stderr, "Failed preparing index_placex: %s\n", PQerrorMessage(conn));
163 pg_prepare_params[0] = PG_OID_INT8;
164 res = PQprepare(thread_data[i].conn, "index_osmline",
165 "update location_property_osmline set indexed_status = 0 where place_id = $1",
166 1, pg_prepare_params);
167 if (PQresultStatus(res) != PGRES_COMMAND_OK)
169 fprintf(stderr, "Failed preparing index_osmline: %s\n", PQerrorMessage(conn));
174 /*res = PQexec(thread_data[i].conn, "set enable_seqscan = false");
175 if (PQresultStatus(res) != PGRES_COMMAND_OK)
177 fprintf(stderr, "Failed disabling sequential scan: %s\n", PQerrorMessage(conn));
182 nominatim_exportCreatePreparedQueries(thread_data[i].conn);
185 // Create the output file
187 if (structuredoutputfile)
189 writer = nominatim_exportXMLStart(structuredoutputfile);
192 fprintf(stderr, "Starting indexing rank (%i to %i) using %i threads\n", rank_min, rank_max, num_threads);
194 // first for the placex table
195 for (rank = rank_min; rank <= rank_max; rank++)
197 // OSMLINE: do reindexing (=> reparenting) for interpolation lines at rank 30, but before all other objects of rank 30
198 // reason: houses (rank 30) depend on the updated interpolation line, when reparenting (see placex_update in functions.sql)
201 fprintf(stderr, "Starting indexing interpolation lines (location_property_osmline)\n");
204 resSectors = PQexecPrepared(conn, "index_sectors_osmline", 0, NULL, 0, NULL, 1);
205 if (PQresultStatus(resSectors) != PGRES_TUPLES_OK)
207 fprintf(stderr, "index_sectors_osmline: SELECT failed: %s", PQerrorMessage(conn));
211 if (PQftype(resSectors, 0) != PG_OID_INT4)
213 fprintf(stderr, "Sector value has unexpected type\n");
217 if (PQftype(resSectors, 1) != PG_OID_INT8)
219 fprintf(stderr, "Sector value has unexpected type\n");
223 rankStartTime = time(0);
224 for (iSector = 0; iSector < PQntuples(resSectors); iSector++)
226 rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)));
228 // do it only if tuples with indexed_status > 0 were found in osmline
229 int nTuples = PQntuples(resSectors);
232 for (iSector = 0; iSector <= nTuples; iSector++)
236 resPlaces = PQgetResult(conn);
237 if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK)
239 fprintf(stderr, "index_sector_places: SELECT failed: %s\n", PQerrorMessage(conn));
243 if (PQftype(resPlaces, 0) != PG_OID_INT8)
245 fprintf(stderr, "Place_id value has unexpected type\n");
249 resNULL = PQgetResult(conn);
252 fprintf(stderr, "Unexpected non-null response\n");
257 if (iSector < nTuples)
259 sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0)));
260 // fprintf(stderr, "\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))));
262 // Get all the place_id's for this sector
263 paramSector = PGint32(sector);
264 paramValues[0] = (char *)¶mSector;
265 paramLengths[0] = sizeof(paramSector);
267 if (rankTotalTuples-rankCountTuples < num_threads*1000)
270 iResult = PQsendQueryPrepared(conn, "index_nosector_places_osmline", 0, NULL, 0, NULL, 1);
274 iResult = PQsendQueryPrepared(conn, "index_sector_places_osmline", 1, paramValues, paramLengths, paramFormats, 1);
278 fprintf(stderr, "index_sector_places_osmline: SELECT failed: %s", PQerrorMessage(conn));
287 tuples = PQntuples(resPlaces);
292 for (i = 0; i < num_threads; i++)
294 thread_data[i].res = resPlaces;
295 thread_data[i].tuples = tuples;
296 thread_data[i].count = &count;
297 thread_data[i].count_mutex = &count_mutex;
298 thread_data[i].writer = writer;
299 thread_data[i].writer_mutex = &writer_mutex;
300 thread_data[i].table = 0; // use osmline table
301 pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]);
303 // Monitor threads to give user feedback
305 while (count < tuples)
309 // Aim for one update per second
310 if (sleepcount++ > 500)
312 rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1);
313 fprintf(stderr, " Done %i in %i @ %f per second - Interpolation Lines ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - (rankCountTuples + count)))/(float)rankPerSecond);
318 // Wait for everything to finish
319 for (i = 0; i < num_threads; i++)
321 pthread_join(thread_data[i].thread, NULL);
323 rankCountTuples += tuples;
326 rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1);
327 fprintf(stderr, " Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond);
330 if (rankTotalTuples-rankCountTuples < num_threads*20 && iSector < nTuples)
332 iSector = nTuples - 1;
338 fprintf(stderr, "\r Done %i tuples in %i seconds- FINISHED\n", rankCountTuples,(int)(difftime(time(0), rankStartTime)));
341 nominatim_exportXMLEnd(writer);
344 fprintf(stderr, "Starting rank %d\n", rank);
348 paramRank = PGint32(rank);
349 paramValues[0] = (char *)¶mRank;
350 paramLengths[0] = sizeof(paramRank);
353 // resSectors = PQexecPrepared(conn, "index_nosectors", 1, paramValues, paramLengths, paramFormats, 1);
355 resSectors = PQexecPrepared(conn, "index_sectors", 1, paramValues, paramLengths, paramFormats, 1);
357 if (PQresultStatus(resSectors) != PGRES_TUPLES_OK)
359 fprintf(stderr, "index_sectors: SELECT failed: %s", PQerrorMessage(conn));
363 if (PQftype(resSectors, 0) != PG_OID_INT4)
365 fprintf(stderr, "Sector value has unexpected type\n");
369 if (PQftype(resSectors, 1) != PG_OID_INT8)
371 fprintf(stderr, "Sector value has unexpected type\n");
377 for (iSector = 0; iSector < PQntuples(resSectors); iSector++)
379 rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)));
382 rankStartTime = time(0);
384 for (iSector = 0; iSector <= PQntuples(resSectors); iSector++)
388 resPlaces = PQgetResult(conn);
389 if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK)
391 fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
395 if (PQftype(resPlaces, 0) != PG_OID_INT8)
397 fprintf(stderr, "Place_id value has unexpected type\n");
401 resNULL = PQgetResult(conn);
404 fprintf(stderr, "Unexpected non-null response\n");
409 if (iSector < PQntuples(resSectors))
411 sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0)));
412 // fprintf(stderr, "\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))));
414 // Get all the place_id's for this sector
415 paramRank = PGint32(rank);
416 paramValues[0] = (char *)¶mRank;
417 paramLengths[0] = sizeof(paramRank);
419 paramSector = PGint32(sector);
420 paramValues[1] = (char *)¶mSector;
421 paramLengths[1] = sizeof(paramSector);
423 if (rankTotalTuples-rankCountTuples < num_threads*1000)
425 iResult = PQsendQueryPrepared(conn, "index_nosector_places", 1, paramValues, paramLengths, paramFormats, 1);
429 iResult = PQsendQueryPrepared(conn, "index_sector_places", 2, paramValues, paramLengths, paramFormats, 1);
433 fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
443 tuples = PQntuples(resPlaces);
448 for (i = 0; i < num_threads; i++)
450 thread_data[i].res = resPlaces;
451 thread_data[i].tuples = tuples;
452 thread_data[i].count = &count;
453 thread_data[i].count_mutex = &count_mutex;
454 thread_data[i].writer = writer;
455 thread_data[i].writer_mutex = &writer_mutex;
456 thread_data[i].table = 1; // use placex table
457 pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]);
460 // Monitor threads to give user feedback
462 while (count < tuples)
466 // Aim for one update per second
467 if (sleepcount++ > 500)
469 rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1);
470 fprintf(stderr, " Done %i in %i @ %f per second - Rank %i ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, rank, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond);
475 // Wait for everything to finish
476 for (i = 0; i < num_threads; i++)
478 pthread_join(thread_data[i].thread, NULL);
481 rankCountTuples += tuples;
485 rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1);
486 fprintf(stderr, " Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond);
490 if (rankTotalTuples-rankCountTuples < num_threads*20 && iSector < PQntuples(resSectors))
492 iSector = PQntuples(resSectors) - 1;
496 fprintf(stderr, "\r Done %i in %i @ %f per second - FINISHED \n\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond);
504 // Close all connections
505 for (i = 0; i < num_threads; i++)
507 PQfinish(thread_data[i].conn);
513 void *nominatim_indexThread(void * thread_data_in)
515 struct index_thread_data * thread_data = (struct index_thread_data * )thread_data_in;
516 struct export_data querySet;
520 const char *paramValues[1];
523 uint64_t paramPlaceID;
525 time_t updateStartTime;
528 table = (uint)(thread_data->table);
532 pthread_mutex_lock( thread_data->count_mutex );
533 if (*(thread_data->count) >= thread_data->tuples)
535 pthread_mutex_unlock( thread_data->count_mutex );
539 place_id = PGint64(*((uint64_t *)PQgetvalue(thread_data->res, *thread_data->count, 0)));
540 (*thread_data->count)++;
542 pthread_mutex_unlock( thread_data->count_mutex );
544 if (verbose) fprintf(stderr, " Processing place_id %ld\n", place_id);
546 updateStartTime = time(0);
549 if (thread_data->writer)
551 nominatim_exportPlaceQueries(place_id, thread_data->conn, &querySet);
556 paramPlaceID = PGint64(place_id);
557 paramValues[0] = (char *)¶mPlaceID;
558 paramLengths[0] = sizeof(paramPlaceID);
560 if (table == 1) // table=1 for placex
562 res = PQexecPrepared(thread_data->conn, "index_placex", 1, paramValues, paramLengths, paramFormats, 1);
564 else // table=0 for osmline
566 res = PQexecPrepared(thread_data->conn, "index_osmline", 1, paramValues, paramLengths, paramFormats, 1);
568 if (PQresultStatus(res) == PGRES_COMMAND_OK)
572 if (!strncmp(PQerrorMessage(thread_data->conn), "ERROR: deadlock detected", 25))
576 fprintf(stderr, "index_placex: UPDATE failed - deadlock, retrying (%ld)\n", place_id);
580 fprintf(stderr, "index_osmline: UPDATE failed - deadlock, retrying (%ld)\n", place_id);
589 fprintf(stderr, "index_placex: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
593 fprintf(stderr, "index_osmline: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
601 if (difftime(time(0), updateStartTime) > 1) fprintf(stderr, " Slow place_id %ld\n", place_id);
603 if (thread_data->writer)
605 nominatim_exportPlace(place_id, thread_data->conn, thread_data->writer, thread_data->writer_mutex, &querySet);
606 nominatim_exportFreeQueries(&querySet);