]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/index.c
put indexing in update mode
[nominatim.git] / nominatim / index.c
1 /*
2 */
3
4 #include <stdio.h>
5 #include <unistd.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <assert.h>
9 #include <pthread.h>
10 #include <time.h>
11 #include <stdint.h>
12
13 #include <libpq-fe.h>
14
15 #include "nominatim.h"
16 #include "index.h"
17 #include "export.h"
18 #include "postgresql.h"
19
20 extern int verbose;
21
22 void nominatim_index(int rank_min, int rank_max, int num_threads, const char *conninfo, const char *structuredoutputfile)
23 {
24     struct index_thread_data * thread_data;
25     pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
26     int tuples, count, sleepcount;
27
28     time_t rankStartTime;
29     int rankTotalTuples;
30     int rankCountTuples;
31     float rankPerSecond;
32
33     PGconn *conn;
34     PGresult * res;
35     PGresult * resSectors;
36     PGresult * resPlaces;
37     PGresult * resNULL;
38
39     int rank;
40     int i;
41     int iSector;
42     int iResult;
43
44     const char *paramValues[2];
45     int         paramLengths[2];
46     int         paramFormats[2];
47     uint32_t    paramRank;
48     uint32_t    paramSector;
49     uint32_t    sector;
50
51     xmlTextWriterPtr writer;
52     pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
53
54     Oid pg_prepare_params[2];
55
56     conn = PQconnectdb(conninfo);
57     if (PQstatus(conn) != CONNECTION_OK)
58     {
59         fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
60         exit(EXIT_FAILURE);
61     }
62
63     pg_prepare_params[0] = PG_OID_INT4;
64     res = PQprepare(conn, "index_sectors",
65                     "select geometry_sector,count(*) from placex where rank_search = $1 and indexed_status > 0 group by geometry_sector order by geometry_sector",
66                     1, pg_prepare_params);
67     if (PQresultStatus(res) != PGRES_COMMAND_OK)
68     {
69         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
70         exit(EXIT_FAILURE);
71     }
72     PQclear(res);
73
74     pg_prepare_params[0] = PG_OID_INT4;
75     res = PQprepare(conn, "index_nosectors",
76                     "select 0::integer,count(*) from placex where rank_search = $1 and indexed_status > 0",
77                     1, pg_prepare_params);
78     if (PQresultStatus(res) != PGRES_COMMAND_OK)
79     {
80         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
81         exit(EXIT_FAILURE);
82     }
83     PQclear(res);
84
85     pg_prepare_params[0] = PG_OID_INT4;
86     pg_prepare_params[1] = PG_OID_INT4;
87     res = PQprepare(conn, "index_sector_places",
88                     "select place_id from placex where rank_search = $1 and geometry_sector = $2 and indexed_status > 0",
89                     2, pg_prepare_params);
90     if (PQresultStatus(res) != PGRES_COMMAND_OK)
91     {
92         fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
93         exit(EXIT_FAILURE);
94     }
95     PQclear(res);
96
97     pg_prepare_params[0] = PG_OID_INT4;
98     res = PQprepare(conn, "index_nosector_places",
99                     "select place_id from placex where rank_search = $1 and indexed_status > 0 order by geometry_sector",
100                     1, pg_prepare_params);
101     if (PQresultStatus(res) != PGRES_COMMAND_OK)
102     {
103         fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
104         exit(EXIT_FAILURE);
105     }
106     PQclear(res);
107
108     // Build the data for each thread
109     thread_data = (struct index_thread_data *)malloc(sizeof(struct index_thread_data)*num_threads);
110     for (i = 0; i < num_threads; i++)
111     {
112         thread_data[i].conn = PQconnectdb(conninfo);
113         if (PQstatus(thread_data[i].conn) != CONNECTION_OK)
114         {
115             fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(thread_data[i].conn));
116             exit(EXIT_FAILURE);
117         }
118
119         pg_prepare_params[0] = PG_OID_INT8;
120         res = PQprepare(thread_data[i].conn, "index_placex",
121                         "update placex set indexed_status = 0 where place_id = $1",
122                         1, pg_prepare_params);
123         if (PQresultStatus(res) != PGRES_COMMAND_OK)
124         {
125             fprintf(stderr, "Failed preparing index_placex: %s\n", PQerrorMessage(conn));
126             exit(EXIT_FAILURE);
127         }
128         PQclear(res);
129
130         /*res = PQexec(thread_data[i].conn, "set enable_seqscan = false");
131         if (PQresultStatus(res) != PGRES_COMMAND_OK)
132         {
133             fprintf(stderr, "Failed disabling sequential scan: %s\n", PQerrorMessage(conn));
134             exit(EXIT_FAILURE);
135         }
136         PQclear(res);*/
137
138         nominatim_exportCreatePreparedQueries(thread_data[i].conn);
139     }
140
141     // Create the output file
142     writer = NULL;
143     if (structuredoutputfile)
144     {
145         writer = nominatim_exportXMLStart(structuredoutputfile);
146     }
147
148     fprintf(stderr, "Starting indexing rank (%i to %i) using %i treads\n", rank_min, rank_max, num_threads);
149
150     for (rank = rank_min; rank <= rank_max; rank++)
151     {
152         fprintf(stderr, "Starting rank %d\n", rank);
153         rankCountTuples = 0;
154         rankPerSecond = 0;
155
156         paramRank = PGint32(rank);
157         paramValues[0] = (char *)&paramRank;
158         paramLengths[0] = sizeof(paramRank);
159         paramFormats[0] = 1;
160 //        if (rank < 16)
161 //            resSectors = PQexecPrepared(conn, "index_nosectors", 1, paramValues, paramLengths, paramFormats, 1);
162 //        else
163         resSectors = PQexecPrepared(conn, "index_sectors", 1, paramValues, paramLengths, paramFormats, 1);
164
165         if (PQresultStatus(resSectors) != PGRES_TUPLES_OK)
166         {
167             fprintf(stderr, "index_sectors: SELECT failed: %s", PQerrorMessage(conn));
168             PQclear(resSectors);
169             exit(EXIT_FAILURE);
170         }
171         if (PQftype(resSectors, 0) != PG_OID_INT4)
172         {
173             fprintf(stderr, "Sector value has unexpected type\n");
174             PQclear(resSectors);
175             exit(EXIT_FAILURE);
176         }
177         if (PQftype(resSectors, 1) != PG_OID_INT8)
178         {
179             fprintf(stderr, "Sector value has unexpected type\n");
180             PQclear(resSectors);
181             exit(EXIT_FAILURE);
182         }
183
184         rankTotalTuples = 0;
185         for (iSector = 0; iSector < PQntuples(resSectors); iSector++)
186         {
187             rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)));
188         }
189
190         rankStartTime = time(0);
191         for (iSector = 0; iSector <= PQntuples(resSectors); iSector++)
192         {
193             if (iSector > 0)
194             {
195                 resPlaces = PQgetResult(conn);
196                 if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK)
197                 {
198                     fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
199                     PQclear(resPlaces);
200                     exit(EXIT_FAILURE);
201                 }
202                 if (PQftype(resPlaces, 0) != PG_OID_INT8)
203                 {
204                     fprintf(stderr, "Place_id value has unexpected type\n");
205                     PQclear(resPlaces);
206                     exit(EXIT_FAILURE);
207                 }
208                 resNULL = PQgetResult(conn);
209                 if (resNULL != NULL)
210                 {
211                     fprintf(stderr, "Unexpected non-null response\n");
212                     exit(EXIT_FAILURE);
213                 }
214             }
215
216             if (iSector < PQntuples(resSectors))
217             {
218                 sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0)));
219 //                fprintf(stderr, "\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))));
220
221                 // Get all the place_id's for this sector
222                 paramRank = PGint32(rank);
223                 paramValues[0] = (char *)&paramRank;
224                 paramLengths[0] = sizeof(paramRank);
225                 paramFormats[0] = 1;
226                 paramSector = PGint32(sector);
227                 paramValues[1] = (char *)&paramSector;
228                 paramLengths[1] = sizeof(paramSector);
229                 paramFormats[1] = 1;
230                 if (rankTotalTuples-rankCountTuples < num_threads*1000)
231                 {
232                     iResult = PQsendQueryPrepared(conn, "index_nosector_places", 1, paramValues, paramLengths, paramFormats, 1);
233                 }
234                 else
235                 {
236                     iResult = PQsendQueryPrepared(conn, "index_sector_places", 2, paramValues, paramLengths, paramFormats, 1);
237                 }
238                 if (!iResult)
239                 {
240                     fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
241                     PQclear(resPlaces);
242                     exit(EXIT_FAILURE);
243                 }
244             }
245
246             if (iSector > 0)
247             {
248                 count = 0;
249                 rankPerSecond = 0;
250                 tuples = PQntuples(resPlaces);
251
252                 if (tuples > 0)
253                 {
254                     // Spawn threads
255                     for (i = 0; i < num_threads; i++)
256                     {
257                         thread_data[i].res = resPlaces;
258                         thread_data[i].tuples = tuples;
259                         thread_data[i].count = &count;
260                         thread_data[i].count_mutex = &count_mutex;
261                         thread_data[i].writer = writer;
262                         thread_data[i].writer_mutex = &writer_mutex;
263                         pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]);
264                     }
265
266                     // Monitor threads to give user feedback
267                     sleepcount = 0;
268                     while (count < tuples)
269                     {
270                         usleep(1000);
271
272                         // Aim for one update per second
273                         if (sleepcount++ > 2000)
274                         {
275                             rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1);
276                             fprintf(stderr, "  Done %i in %i @ %f per second - Rank %i ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, rank, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond);
277                             sleepcount = 0;
278                         }
279                     }
280
281                     // Wait for everything to finish
282                     for (i = 0; i < num_threads; i++)
283                     {
284                         pthread_join(thread_data[i].thread, NULL);
285                     }
286
287                     rankCountTuples += tuples;
288                 }
289
290                 // Finished sector
291                 rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1);
292                 fprintf(stderr, "  Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond);
293
294                 PQclear(resPlaces);
295             }
296             if (rankTotalTuples-rankCountTuples < num_threads*20 && iSector < PQntuples(resSectors))
297             {
298                 iSector = PQntuples(resSectors) - 1;
299             }
300         }
301         // Finished rank
302         fprintf(stderr, "\r  Done %i in %i @ %f per second - FINISHED                      \n\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond);
303
304         PQclear(resSectors);
305     }
306
307     if (writer)
308     {
309         nominatim_exportXMLEnd(writer);
310     }
311 }
312
313 void *nominatim_indexThread(void * thread_data_in)
314 {
315     struct index_thread_data * thread_data = (struct index_thread_data * )thread_data_in;
316     struct export_data  querySet;
317
318     PGresult   *res;
319
320     const char *paramValues[1];
321     int         paramLengths[1];
322     int         paramFormats[1];
323     uint64_t    paramPlaceID;
324     uint64_t    place_id;
325     time_t              updateStartTime;
326
327     while (1)
328     {
329         pthread_mutex_lock( thread_data->count_mutex );
330         if (*(thread_data->count) >= thread_data->tuples)
331         {
332             pthread_mutex_unlock( thread_data->count_mutex );
333             break;
334         }
335
336         place_id = PGint64(*((uint64_t *)PQgetvalue(thread_data->res, *thread_data->count, 0)));
337         (*thread_data->count)++;
338
339         pthread_mutex_unlock( thread_data->count_mutex );
340
341         if (verbose) fprintf(stderr, "  Processing place_id %ld\n", place_id);
342
343         updateStartTime = time(0);
344         int done = 0;
345
346         if (thread_data->writer)
347         {
348              nominatim_exportPlaceQueries(place_id, thread_data->conn, &querySet);
349         }
350
351         while(!done)
352         {
353                 paramPlaceID = PGint64(place_id);
354                 paramValues[0] = (char *)&paramPlaceID;
355                 paramLengths[0] = sizeof(paramPlaceID);
356                 paramFormats[0] = 1;
357                 res = PQexecPrepared(thread_data->conn, "index_placex", 1, paramValues, paramLengths, paramFormats, 1);
358                 if (PQresultStatus(res) == PGRES_COMMAND_OK)
359                         done = 1;
360                 else
361                 {
362                         if (!strncmp(PQerrorMessage(thread_data->conn), "ERROR:  deadlock detected", 25))
363                         {
364                             fprintf(stderr, "index_placex: UPDATE failed - deadlock, retrying (%ld)\n", place_id);
365                             PQclear(res);
366                             sleep(rand() % 10);
367                         }
368                         else
369                         {
370                             fprintf(stderr, "index_placex: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
371                             PQclear(res);
372                             sleep(rand() % 10);
373 //                          exit(EXIT_FAILURE);
374                         }
375                 }
376         }
377         PQclear(res);
378         if (difftime(time(0), updateStartTime) > 1) fprintf(stderr, "  Slow place_id %ld\n", place_id);
379
380         if (thread_data->writer)
381         {
382             nominatim_exportPlace(place_id, thread_data->conn, thread_data->writer, thread_data->writer_mutex, &querySet);
383             nominatim_exportFreeQueries(&querySet);
384         }
385     }
386
387     return NULL;
388 }