From dd43ee307485fbac969dac6944373b2158003211 Mon Sep 17 00:00:00 2001 From: Tom Hughes Date: Wed, 3 Jul 2013 22:54:46 +0100 Subject: [PATCH] Bring changeset replication task under chef control --- cookbooks/planet/recipes/replication.rb | 20 +++ .../templates/default/changesets.bin.erb | 152 ++++++++++++++++++ .../templates/default/changesets.conf.erb | 3 + .../templates/default/replication.cron.erb | 7 +- 4 files changed, 181 insertions(+), 1 deletion(-) create mode 100644 cookbooks/planet/templates/default/changesets.bin.erb create mode 100644 cookbooks/planet/templates/default/changesets.conf.erb diff --git a/cookbooks/planet/recipes/replication.rb b/cookbooks/planet/recipes/replication.rb index f055280f1..2d9dc4f4b 100644 --- a/cookbooks/planet/recipes/replication.rb +++ b/cookbooks/planet/recipes/replication.rb @@ -21,6 +21,18 @@ include_recipe "osmosis" db_passwords = data_bag_item("db", "passwords") +package "ruby" +package "ruby-libxml" + +gem_package "pg" + +template "/usr/local/bin/replicate-changesets" do + source "changesets.bin.erb" + owner "root" + group "root" + mode 0755 +end + directory "/etc/replication" do owner "root" group "root" @@ -35,6 +47,14 @@ template "/etc/replication/auth.conf" do variables :password => db_passwords["planetdiff"] end +template "/etc/replication/changesets.conf" do + source "changesets.conf.erb" + user "root" + group "planet" + mode 0640 + variables :password => db_passwords["planetdiff"] +end + directory "/var/lib/replication" do owner "planet" group "planet" diff --git a/cookbooks/planet/templates/default/changesets.bin.erb b/cookbooks/planet/templates/default/changesets.bin.erb new file mode 100644 index 000000000..443d8ede3 --- /dev/null +++ b/cookbooks/planet/templates/default/changesets.bin.erb @@ -0,0 +1,152 @@ +#!/usr/bin/ruby + +require 'rubygems' +require 'pg' +require 'yaml' +require 'time' +require 'fileutils' +require 'xml/libxml' +require 'zlib' + +# after this many changes, a changeset will be closed +CHANGES_LIMIT=50000 + +# this is the scale factor for lat/lon values stored as integers in the database +GEO_SCALE=10000000 + +## +# changeset class keeps some information about changesets downloaded from the +# database - enough to let us know which changesets are closed/open & recently +# closed. +class Changeset + attr_reader :id, :created_at, :closed_at, :num_changes + + def initialize(row) + @id = row['id'].to_i + @created_at = Time.parse(row['created_at']) + @closed_at = Time.parse(row['closed_at']) + @num_changes = row['num_changes'].to_i + end + + def closed?(t) + (@closed_at < t) || (@num_changes >= CHANGES_LIMIT) + end + + def open?(t) + not closed?(t) + end + + def activity_between?(t1, t2) + ((@closed_at >= t1) && (@closed_at < t2)) || ((@created_at >= t1) && (@created_at < t2)) + end +end + +## +# state and connections associated with getting changeset data +# replicated to a file. +class Replicator + def initialize(config) + @config = YAML.load(File.read(config)) + @state = YAML.load(File.read(@config['state_file'])) + @conn = PGconn.connect(@config['db']) + @now = Time.now.getutc + end + + def open_changesets + last_run = @state['last_run'] + last_run = (@now - 60) if last_run.nil? + @state['last_run'] = @now + # pretty much all operations on a changeset will modify its closed_at + # time (see rails_port's changeset model). so it is probably enough + # for us to look at anything that was closed recently, and filter from + # there. + @conn. + exec("select id, created_at, closed_at, num_changes from changesets where closed_at > ((now() at time zone 'utc') - '1 hour'::interval)"). + map {|row| Changeset.new(row) }. + select {|cs| cs.activity_between?(last_run, @now) } + end + + # creates an XML file containing the changeset information from the + # list of changesets output by open_changesets. + def changeset_dump(changesets) + doc = XML::Document.new + doc.root = XML::Node.new("osm") + { 'version' => '0.6', + 'generator' => 'replicate_changesets.rb', + 'copyright' => "OpenStreetMap and contributors", + 'attribution' => "http://www.openstreetmap.org/copyright", + 'license' => "http://opendatacommons.org/licenses/odbl/1-0/" }. + each { |k,v| doc.root[k] = v } + + changesets.each do |cs| + xml = XML::Node.new("changeset") + xml['id'] = cs.id.to_s + xml['created_at'] = cs.created_at.getutc.xmlschema + xml['closed_at'] = cs.closed_at.getutc.xmlschema if cs.closed?(@now) + xml['open'] = cs.open?(@now).to_s + + res = @conn.exec("select u.id, u.display_name, c.min_lat, c.max_lat, c.min_lon, c.max_lon from users u join changesets c on u.id=c.user_id where c.id=#{cs.id}") + xml['user'] = res[0]['display_name'] + xml['uid'] = res[0]['id'] + + unless (res[0]['min_lat'].nil? || + res[0]['max_lat'].nil? || + res[0]['min_lon'].nil? || + res[0]['max_lon'].nil?) + xml['min_lat'] = (res[0]['min_lat'].to_f / GEO_SCALE).to_s + xml['max_lat'] = (res[0]['max_lat'].to_f / GEO_SCALE).to_s + xml['min_lon'] = (res[0]['min_lon'].to_f / GEO_SCALE).to_s + xml['max_lon'] = (res[0]['max_lon'].to_f / GEO_SCALE).to_s + end + + res = @conn.exec("select k, v from changeset_tags where changeset_id=#{cs.id}") + res.each do |row| + tag = XML::Node.new("tag") + tag['k'] = row['k'] + tag['v'] = row['v'] + xml << tag + end + + doc.root << xml + end + + doc.to_s + end + + # saves new state (including the changeset dump xml) + def save! + File.open(@config['state_file'], "r") do |fl| + fl.flock(File::LOCK_EX) + + sequence = (@state.has_key?('sequence') ? @state['sequence'] + 1 : 0) + data_file = @config['data_dir'] + sprintf("/%03d/%03d/%03d.osm.gz", sequence / 1000000, (sequence / 1000) % 1000, (sequence % 1000)); + tmp_state = @config['state_file'] + ".tmp" + tmp_data = "/tmp/changeset_data.osm.tmp" + # try and write the files to tmp locations and then + # move them into place later, to avoid in-progress + # clashes, or people seeing incomplete files. + begin + FileUtils.mkdir_p(File.dirname(data_file)) + Zlib::GzipWriter.open(tmp_data) do |fh| + fh.write(changeset_dump(open_changesets)) + end + @state['sequence'] = sequence + File.open(tmp_state, "w") do |fh| + fh.write(YAML.dump(@state)) + end + FileUtils.mv(tmp_data, data_file) + FileUtils.mv(tmp_state, @config['state_file']) + fl.flock(File::LOCK_UN) + + rescue + STDERR.puts("Error! Couldn't update state.") + fl.flock(File::LOCK_UN) + raise + end + end + end +end + +rep = Replicator.new(ARGV[0]) +rep.save! + diff --git a/cookbooks/planet/templates/default/changesets.conf.erb b/cookbooks/planet/templates/default/changesets.conf.erb new file mode 100644 index 000000000..72ac2a067 --- /dev/null +++ b/cookbooks/planet/templates/default/changesets.conf.erb @@ -0,0 +1,3 @@ +state_file: /store/planet/replication/changesets/state.yaml +db: host=db dbname=openstreetmap user=planetdiff password=<%= @password %> +data_dir: /store/planet/replication/changesets diff --git a/cookbooks/planet/templates/default/replication.cron.erb b/cookbooks/planet/templates/default/replication.cron.erb index 6993b1505..dead114ef 100644 --- a/cookbooks/planet/templates/default/replication.cron.erb +++ b/cookbooks/planet/templates/default/replication.cron.erb @@ -1,8 +1,13 @@ # DO NOT EDIT - This file is being maintained by Chef -MAILTO=brett@bretth.com TZ=UTC +MAILTO=brett@bretth.com + * * * * * planet /usr/local/bin/osmosis -q --replicate-apidb authFile=/etc/replication/auth.conf validateSchemaVersion=false --write-replication workingDirectory=/store/planet/replication/minute 2 * * * * planet /usr/local/bin/osmosis -q --merge-replication-files workingDirectory=/var/lib/replication/hour 5 * * * * planet /usr/local/bin/osmosis -q --merge-replication-files workingDirectory=/var/lib/replication/day + +MAILTO=zerebubuth@gmail.com + +* * * * * planet /usr/local/bin/replicate-changesets /etc/replication/changesets.conf -- 2.39.5