]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/index.c
fix structured and batch mode. Add constant to disable batch mode by default
[nominatim.git] / nominatim / index.c
1 /*
2 */
3
4 #include <stdio.h>
5 #include <unistd.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <assert.h>
9 #include <pthread.h>
10 #include <time.h>
11 #include <stdint.h>
12
13 #include <libpq-fe.h>
14
15 #include "nominatim.h"
16 #include "index.h"
17 #include "export.h"
18 #include "postgresql.h"
19
20 extern int verbose;
21
22 void nominatim_index(int rank_min, int rank_max, int num_threads, const char *conninfo, const char *structuredoutputfile)
23 {
24     struct index_thread_data * thread_data;
25     pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
26     int tuples, count, sleepcount;
27
28     time_t rankStartTime;
29     int rankTotalTuples;
30     int rankCountTuples;
31     float rankPerSecond;
32
33     PGconn *conn;
34     PGresult * res;
35     PGresult * resSectors;
36     PGresult * resPlaces;
37     PGresult * resNULL;
38
39     int rank;
40     int i;
41     int iSector;
42     int iResult;
43
44     const char *paramValues[2];
45     int         paramLengths[2];
46     int         paramFormats[2];
47     uint32_t    paramRank;
48     uint32_t    paramSector;
49     uint32_t    sector;
50
51     xmlTextWriterPtr writer;
52     pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
53
54     Oid pg_prepare_params[2];
55
56     conn = PQconnectdb(conninfo);
57     if (PQstatus(conn) != CONNECTION_OK)
58     {
59         fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
60         exit(EXIT_FAILURE);
61     }
62
63     pg_prepare_params[0] = PG_OID_INT4;
64     res = PQprepare(conn, "index_sectors",
65                     "select geometry_sector,count(*) from placex where rank_search = $1 and indexed_status > 0 group by geometry_sector order by geometry_sector",
66                     1, pg_prepare_params);
67     if (PQresultStatus(res) != PGRES_COMMAND_OK)
68     {
69         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
70         exit(EXIT_FAILURE);
71     }
72     PQclear(res);
73
74     pg_prepare_params[0] = PG_OID_INT4;
75     res = PQprepare(conn, "index_nosectors",
76                     "select 0::integer,count(*) from placex where rank_search = $1 and indexed_status > 0",
77                     1, pg_prepare_params);
78     if (PQresultStatus(res) != PGRES_COMMAND_OK)
79     {
80         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
81         exit(EXIT_FAILURE);
82     }
83     PQclear(res);
84
85     pg_prepare_params[0] = PG_OID_INT4;
86     pg_prepare_params[1] = PG_OID_INT4;
87     res = PQprepare(conn, "index_sector_places",
88                     "select place_id from placex where rank_search = $1 and geometry_sector = $2 and indexed_status > 0",
89                     2, pg_prepare_params);
90     if (PQresultStatus(res) != PGRES_COMMAND_OK)
91     {
92         fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
93         exit(EXIT_FAILURE);
94     }
95     PQclear(res);
96
97     pg_prepare_params[0] = PG_OID_INT4;
98     res = PQprepare(conn, "index_nosector_places",
99                     "select place_id from placex where rank_search = $1 and indexed_status > 0 order by geometry_sector",
100                     1, pg_prepare_params);
101     if (PQresultStatus(res) != PGRES_COMMAND_OK)
102     {
103         fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
104         exit(EXIT_FAILURE);
105     }
106     PQclear(res);
107
108     // Build the data for each thread
109     thread_data = (struct index_thread_data *)malloc(sizeof(struct index_thread_data)*num_threads);
110     for (i = 0; i < num_threads; i++)
111     {
112         thread_data[i].conn = PQconnectdb(conninfo);
113         if (PQstatus(thread_data[i].conn) != CONNECTION_OK)
114         {
115             fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(thread_data[i].conn));
116             exit(EXIT_FAILURE);
117         }
118
119         pg_prepare_params[0] = PG_OID_INT8;
120         res = PQprepare(thread_data[i].conn, "index_placex",
121                         "update placex set indexed_status = 0 where place_id = $1",
122                         1, pg_prepare_params);
123         if (PQresultStatus(res) != PGRES_COMMAND_OK)
124         {
125             fprintf(stderr, "Failed preparing index_placex: %s\n", PQerrorMessage(conn));
126             exit(EXIT_FAILURE);
127         }
128         PQclear(res);
129
130         /*res = PQexec(thread_data[i].conn, "set enable_seqscan = false");
131         if (PQresultStatus(res) != PGRES_COMMAND_OK)
132         {
133             fprintf(stderr, "Failed disabling sequential scan: %s\n", PQerrorMessage(conn));
134             exit(EXIT_FAILURE);
135         }
136         PQclear(res);*/
137
138         nominatim_exportCreatePreparedQueries(thread_data[i].conn);
139     }
140
141     // Create the output file
142     writer = NULL;
143     if (structuredoutputfile)
144     {
145         writer = nominatim_exportXMLStart(structuredoutputfile);
146     }
147
148     fprintf(stderr, "Starting indexing rank (%i to %i) using %i threads\n", rank_min, rank_max, num_threads);
149
150     for (rank = rank_min; rank <= rank_max; rank++)
151     {
152         fprintf(stderr, "Starting rank %d\n", rank);
153         rankCountTuples = 0;
154         rankPerSecond = 0;
155
156         paramRank = PGint32(rank);
157         paramValues[0] = (char *)&paramRank;
158         paramLengths[0] = sizeof(paramRank);
159         paramFormats[0] = 1;
160 //        if (rank < 16)
161 //            resSectors = PQexecPrepared(conn, "index_nosectors", 1, paramValues, paramLengths, paramFormats, 1);
162 //        else
163         resSectors = PQexecPrepared(conn, "index_sectors", 1, paramValues, paramLengths, paramFormats, 1);
164
165         if (PQresultStatus(resSectors) != PGRES_TUPLES_OK)
166         {
167             fprintf(stderr, "index_sectors: SELECT failed: %s", PQerrorMessage(conn));
168             PQclear(resSectors);
169             exit(EXIT_FAILURE);
170         }
171         if (PQftype(resSectors, 0) != PG_OID_INT4)
172         {
173             fprintf(stderr, "Sector value has unexpected type\n");
174             PQclear(resSectors);
175             exit(EXIT_FAILURE);
176         }
177         if (PQftype(resSectors, 1) != PG_OID_INT8)
178         {
179             fprintf(stderr, "Sector value has unexpected type\n");
180             PQclear(resSectors);
181             exit(EXIT_FAILURE);
182         }
183
184         rankTotalTuples = 0;
185         for (iSector = 0; iSector < PQntuples(resSectors); iSector++)
186         {
187             rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)));
188         }
189
190         rankStartTime = time(0);
191         for (iSector = 0; iSector <= PQntuples(resSectors); iSector++)
192         {
193             if (iSector > 0)
194             {
195                 resPlaces = PQgetResult(conn);
196                 if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK)
197                 {
198                     fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
199                     PQclear(resPlaces);
200                     exit(EXIT_FAILURE);
201                 }
202                 if (PQftype(resPlaces, 0) != PG_OID_INT8)
203                 {
204                     fprintf(stderr, "Place_id value has unexpected type\n");
205                     PQclear(resPlaces);
206                     exit(EXIT_FAILURE);
207                 }
208                 resNULL = PQgetResult(conn);
209                 if (resNULL != NULL)
210                 {
211                     fprintf(stderr, "Unexpected non-null response\n");
212                     exit(EXIT_FAILURE);
213                 }
214             }
215
216             if (iSector < PQntuples(resSectors))
217             {
218                 sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0)));
219 //                fprintf(stderr, "\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))));
220
221                 // Get all the place_id's for this sector
222                 paramRank = PGint32(rank);
223                 paramValues[0] = (char *)&paramRank;
224                 paramLengths[0] = sizeof(paramRank);
225                 paramFormats[0] = 1;
226                 paramSector = PGint32(sector);
227                 paramValues[1] = (char *)&paramSector;
228                 paramLengths[1] = sizeof(paramSector);
229                 paramFormats[1] = 1;
230                 if (rankTotalTuples-rankCountTuples < num_threads*1000)
231                 {
232                     iResult = PQsendQueryPrepared(conn, "index_nosector_places", 1, paramValues, paramLengths, paramFormats, 1);
233                 }
234                 else
235                 {
236                     iResult = PQsendQueryPrepared(conn, "index_sector_places", 2, paramValues, paramLengths, paramFormats, 1);
237                 }
238                 if (!iResult)
239                 {
240                     fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
241                     PQclear(resPlaces);
242                     exit(EXIT_FAILURE);
243                 }
244             }
245
246             if (iSector > 0)
247             {
248                 count = 0;
249                 rankPerSecond = 0;
250                 tuples = PQntuples(resPlaces);
251
252                 if (tuples > 0)
253                 {
254                     // Spawn threads
255                     for (i = 0; i < num_threads; i++)
256                     {
257                         thread_data[i].res = resPlaces;
258                         thread_data[i].tuples = tuples;
259                         thread_data[i].count = &count;
260                         thread_data[i].count_mutex = &count_mutex;
261                         thread_data[i].writer = writer;
262                         thread_data[i].writer_mutex = &writer_mutex;
263                         pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]);
264                     }
265
266                     // Monitor threads to give user feedback
267                     sleepcount = 0;
268                     while (count < tuples)
269                     {
270                         usleep(1000);
271
272                         // Aim for one update per second
273                         if (sleepcount++ > 500)
274                         {
275                             rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1);
276                             fprintf(stderr, "  Done %i in %i @ %f per second - Rank %i ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, rank, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond);
277                             sleepcount = 0;
278                         }
279                     }
280
281                     // Wait for everything to finish
282                     for (i = 0; i < num_threads; i++)
283                     {
284                         pthread_join(thread_data[i].thread, NULL);
285                     }
286
287                     rankCountTuples += tuples;
288                 }
289
290                 // Finished sector
291                 rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1);
292                 fprintf(stderr, "  Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond);
293
294                 PQclear(resPlaces);
295             }
296             if (rankTotalTuples-rankCountTuples < num_threads*20 && iSector < PQntuples(resSectors))
297             {
298                 iSector = PQntuples(resSectors) - 1;
299             }
300         }
301         // Finished rank
302         fprintf(stderr, "\r  Done %i in %i @ %f per second - FINISHED                      \n\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond);
303
304         PQclear(resSectors);
305     }
306
307     if (writer)
308     {
309         nominatim_exportXMLEnd(writer);
310     }
311
312     // Close all connections
313     for (i = 0; i < num_threads; i++)
314     {
315         PQfinish(thread_data[i].conn);
316     }
317     PQfinish(conn);
318 }
319
320 void *nominatim_indexThread(void * thread_data_in)
321 {
322     struct index_thread_data * thread_data = (struct index_thread_data * )thread_data_in;
323     struct export_data  querySet;
324
325     PGresult   *res;
326
327     const char *paramValues[1];
328     int         paramLengths[1];
329     int         paramFormats[1];
330     uint64_t    paramPlaceID;
331     uint64_t    place_id;
332     time_t              updateStartTime;
333
334     while (1)
335     {
336         pthread_mutex_lock( thread_data->count_mutex );
337         if (*(thread_data->count) >= thread_data->tuples)
338         {
339             pthread_mutex_unlock( thread_data->count_mutex );
340             break;
341         }
342
343         place_id = PGint64(*((uint64_t *)PQgetvalue(thread_data->res, *thread_data->count, 0)));
344         (*thread_data->count)++;
345
346         pthread_mutex_unlock( thread_data->count_mutex );
347
348         if (verbose) fprintf(stderr, "  Processing place_id %ld\n", place_id);
349
350         updateStartTime = time(0);
351         int done = 0;
352
353         if (thread_data->writer)
354         {
355              nominatim_exportPlaceQueries(place_id, thread_data->conn, &querySet);
356         }
357
358         while(!done)
359         {
360                 paramPlaceID = PGint64(place_id);
361                 paramValues[0] = (char *)&paramPlaceID;
362                 paramLengths[0] = sizeof(paramPlaceID);
363                 paramFormats[0] = 1;
364                 res = PQexecPrepared(thread_data->conn, "index_placex", 1, paramValues, paramLengths, paramFormats, 1);
365                 if (PQresultStatus(res) == PGRES_COMMAND_OK)
366                         done = 1;
367                 else
368                 {
369                         if (!strncmp(PQerrorMessage(thread_data->conn), "ERROR:  deadlock detected", 25))
370                         {
371                             fprintf(stderr, "index_placex: UPDATE failed - deadlock, retrying (%ld)\n", place_id);
372                             PQclear(res);
373                             sleep(rand() % 10);
374                         }
375                         else
376                         {
377                             fprintf(stderr, "index_placex: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
378                             PQclear(res);
379                             exit(EXIT_FAILURE);
380                         }
381                 }
382         }
383         PQclear(res);
384         if (difftime(time(0), updateStartTime) > 1) fprintf(stderr, "  Slow place_id %ld\n", place_id);
385
386         if (thread_data->writer)
387         {
388             nominatim_exportPlace(place_id, thread_data->conn, thread_data->writer, thread_data->writer_mutex, &querySet);
389             nominatim_exportFreeQueries(&querySet);
390         }
391     }
392
393     return NULL;
394 }