Rails:更快地对许多记录执行更新

在我们的Rails 3.2.13应用程序(Heroku中的Ruby 2.0.0 + Postgres)中,我们经常从API中检索大量订单数据,然后我们需要更新或创建数据库中的每个订单,以及关联。 单个订单创建/更新自身加上约。 10-15个关联对象,我们一次最多导入500个订单。

下面的代码有效,但问题是它在速度方面根本没有效率。 创建/更新500条记录大约需要 1分钟,生成6500多个db查询!

def add_details(shop, shopify_orders) shopify_orders.each do |shopify_order| order = Order.where(:order_id => shopify_order.id.to_s, :shop_id => shop.id).first_or_create order.update_details(order,shopify_order,shop) #This calls update_attributes for the Order ShippingLine.add_details(order, shopify_order.shipping_lines) LineItem.add_details(order, shopify_order.line_items) Taxline.add_details(order, shopify_order.tax_lines) Fulfillment.add_details(order, shopify_order.fulfillments) Note.add_details(order, shopify_order.note_attributes) Discount.add_details(order, shopify_order.discount_codes) billing_address = shopify_order.billing_address rescue nil if !billing_address.blank? BillingAddress.add_details(order, billing_address) end shipping_address = shopify_order.shipping_address rescue nil if !shipping_address.blank? ShippingAddress.add_details(order, shipping_address) end payment_details = shopify_order.payment_details rescue nil if !payment_details.blank? PaymentDetail.add_details(order, payment_details) end end end def update_details(order,shopify_order,shop) order.update_attributes( :order_name => shopify_order.name, :order_created_at => shopify_order.created_at, :order_updated_at => shopify_order.updated_at, :status => Order.get_status(shopify_order), :payment_status => shopify_order.financial_status, :fulfillment_status => Order.get_fulfillment_status(shopify_order), :payment_method => shopify_order.processing_method, :gateway => shopify_order.gateway, :currency => shopify_order.currency, :subtotal_price => shopify_order.subtotal_price, :subtotal_tax => shopify_order.total_tax, :total_discounts => shopify_order.total_discounts, :total_line_items_price => shopify_order.total_line_items_price, :total_price => shopify_order.total_price, :total_tax => shopify_order.total_tax, :total_weight => shopify_order.total_weight, :taxes_included => shopify_order.taxes_included, :shop_id => shop.id, :email => shopify_order.email, :order_note => shopify_order.note ) end 

正如您所看到的,我们循环遍历每个订单,查明它是否存在(然后加载现有订单或创建新订单),然后调用update_attributes以传递订单的详细信息。 之后,我们创建或更新每个关联。 每个关联的模型看起来非常相似:

  class < order.id) taxline.update_details(shopify_tax_line) end end end def update_details(tax_line) self.update_attributes(:price => tax_line.price, :rate => tax_line.rate, :title => tax_line.title) end 

我查看了activerecord-import gem但不幸的是它似乎更倾向于批量创建记录而不是我们也需要更新。

可以改善性能的最佳方法是什么?

许多人提前感谢。

更新:

我想出了这个微小的改进,它必然会删除更新新创建的订单的调用(每个订单少一个查询)。

  def add_details(shop, shopify_orders) shopify_orders.each do |shopify_order| values = {:order_id => shopify_order.id.to_s, :shop_id => shop.id, :order_name => shopify_order.name, :order_created_at => shopify_order.created_at, :order_updated_at => shopify_order.updated_at, :status => Order.get_status(shopify_order), :payment_status => shopify_order.financial_status, :fulfillment_status => Order.get_fulfillment_status(shopify_order), :payment_method => shopify_order.processing_method, :gateway => shopify_order.gateway, :currency => shopify_order.currency, :subtotal_price => shopify_order.subtotal_price, :subtotal_tax => shopify_order.total_tax, :total_discounts => shopify_order.total_discounts, :total_line_items_price => shopify_order.total_line_items_price, :total_price => shopify_order.total_price, :total_tax => shopify_order.total_tax, :total_weight => shopify_order.total_weight, :taxes_included => shopify_order.taxes_included, :email => shopify_order.email, :order_note => shopify_order.note} get_order = Order.where(:order_id => shopify_order.id.to_s, :shop_id => shop.id) if get_order.blank? order = Order.create(values) else order = get_order.first order.update_attributes(values) end ShippingLine.add_details(order, shopify_order.shipping_lines) LineItem.add_details(order, shopify_order.line_items) Taxline.add_details(order, shopify_order.tax_lines) Fulfillment.add_details(order, shopify_order.fulfillments) Note.add_details(order, shopify_order.note_attributes) Discount.add_details(order, shopify_order.discount_codes) billing_address = shopify_order.billing_address rescue nil if !billing_address.blank? BillingAddress.add_details(order, billing_address) end shipping_address = shopify_order.shipping_address rescue nil if !shipping_address.blank? ShippingAddress.add_details(order, shipping_address) end payment_details = shopify_order.payment_details rescue nil if !payment_details.blank? PaymentDetail.add_details(order, payment_details) end end end 

以及相关对象:

  class < order.id, :price => tax_line.price, :rate => tax_line.rate, :title => tax_line.title} get_taxline = Taxline.where(:order_id => order.id) if get_taxline.blank? taxline = Taxline.create(values) else taxline = get_taxline.first taxline.update_attributes(values) end end end end 

还有更好的建议?

尝试将整个代码包装到单个数据库事务中。 因为你在Heroku上,它将成为Postgres的底端。 有了这么多的更新语句,你可以通过一次性处理它们来大大受益,所以你的代码执行得更快,基本上只留下6500语句的“队列”在Postgres端运行,因为服务器能够将它们出列。 根据底端,您可能必须进行更小的交易 – 但即使每次交易100(然后关闭并重新打开交易)也会大大提高Pg的吞吐量。

http://api.rubyonrails.org/classes/ActiveRecord/Transactions/ClassMethods.html http://www.postgresql.org/docs/9.2/static/sql-set-transaction.html

所以在第2行之前你会添加如下内容:

 def add_details(shop, shopify_orders) Order.transaction do shopify_orders.each do |shopify_order| 

然后在你的方法的最后添加另一个结束:

  if !payment_details.blank? PaymentDetail.add_details(order, payment_details) end end //shopify_orders.each.. end //Order.transaction.. end //method 

你可以像这样修补ActiveRecord:

 class ActiveRecord::Base #http://stackoverflow.com/questions/15317837/bulk-insert-records-into-active-record-table?lq=1 #https://gist.github.com/jackrg/76ade1724bd816292e4e # "UPDATE THIS SET  FROM  THIS JOIN (VALUES (, ,...) VALS (  ) ON " def self.bulk_update(record_list) pk = self.primary_key raise "primary_key not found" unless pk.present? raise "record_list not an Array of Hashes" unless record_list.is_a?(Array) && record_list.all? {|rec| rec.is_a? Hash } return nil if record_list.empty? result = nil #test if every hash has primary keys, so we can JOIN record_list.each { |r| raise "Primary Keys '#{self.primary_key.to_s}' not found on record: #{r}" unless hasAllPKs?(r) } #list of primary keys comparison pk_comparison_array = [] if (pk).is_a?(Array) pk.each {|thiskey| pk_comparison_array << "THIS.#{thiskey} = VALS.#{thiskey}" } else pk_comparison_array << "THIS.#{pk} = VALS.#{pk}" end pk_comparison = pk_comparison_array.join(' AND ') #SQL (1..record_list.count).step(1000).each do |start| key_list, value_list = convert_record_list(record_list[start-1..start+999]) #csv values csv_vals = value_list.map {|v| "(#{v.join(", ")})" }.join(", ") #column names column_names = key_list.join(", ") #list of columns assignments columns_assign_array = [] key_list.each {|col| unless inPK?(col) columns_assign_array << "THIS.#{col} = VALS.#{col}" end } columns_assign = columns_assign_array.join(', ') sql = "UPDATE THIS SET #{columns_assign} FROM #{self.table_name} THIS JOIN ( VALUES #{csv_vals} ) VALS ( #{column_names} ) ON ( #{pk_comparison} )" result = self.connection.execute(sql) return result if result<0 end return result end def self.inPK?(str) pk = self.primary_key test = str.to_s if pk.is_a?(Array) (pk.include?(test)) else (pk==test) end end #test if given hash has primary keys included as hash keys and those keys are not empty def self.hasAllPKs?(hash) h = hash.stringify_keys pk = self.primary_key if pk.is_a?(Array) (pk.all? {|k| h.key?(k) and h[k].present? }) else h.key?(pk) and h[pk].present? end end def self.convert_record_list(record_list) # Build the list of keys key_list = record_list.map(&:keys).flatten.map(&:to_s).uniq.sort value_list = record_list.map do |rec| list = [] key_list.each {|key| list << ActiveRecord::Base.connection.quote(rec[key] || rec[key.to_sym]) } list end # If table has standard timestamps and they're not in the record list then add them to the record list time = ActiveRecord::Base.connection.quote(Time.now) for field_name in %w(created_at updated_at) if self.column_names.include?(field_name) && !(key_list.include?(field_name)) key_list << field_name value_list.each {|rec| rec << time } end end return [key_list, value_list] end end 

然后,您可以生成包含模型属性(包括其主键)的哈希数组,并执行以下操作:

 ActiveRecord::Base.transaction do Model.bulk_update [ {attr1: val1, attr2: val2,...}, {attr1: val1, attr2: val2,...}, ... ] end 

它将是一个没有Rails回调和validation的SQL命令。

对于PostgreSQL,上述方法无法解决几个问题:

  1. 您必须在更新目标表中指定实际表,而不仅仅是别名。
  2. 您不能在FROM短语中重复目标表。 由于您要将目标表连接到VALUES表(因此在FROM短语中只有一个表,您将无法使用JOIN,您必须使用“WHERE”。
  3. 在一个简单的“UPDATE”命令中,你没有在VALUES表中获得相同的“自由”强制转换,因此你必须这样投出日期/时间戳值(#val_cast这样做)。

     class ActiveRecord::Base def self.update!(record_list) raise ArgumentError "record_list not an Array of Hashes" unless record_list.is_a?(Array) && record_list.all? {|rec| rec.is_a? Hash } return record_list if record_list.empty? (1..record_list.count).step(1000).each do |start| field_list, value_list = convert_record_list(record_list[start-1..start+999]) key_field = self.primary_key non_key_fields = field_list - [%Q["#{self.primary_key}"], %Q["created_at"]] columns_assign = non_key_fields.map {|field| "#{field} = #{val_cast(field)}"}.join(",") value_table = value_list.map {|row| "(#{row.join(", ")})" }.join(", ") sql = "UPDATE #{table_name} AS this SET #{columns_assign} FROM (VALUES #{value_table}) vals (#{field_list.join(", ")}) WHERE this.#{key_field} = vals.#{key_field}" self.connection.update_sql(sql) end return record_list end def self.val_cast(field) field = field.gsub('"', '') if (column = columns.find{|c| c.name == field }).sql_type =~ /time|date/ "cast (vals.#{field} as #{column.sql_type})" else "vals.#{field}" end end def self.convert_record_list(record_list) # Build the list of fields field_list = record_list.map(&:keys).flatten.map(&:to_s).uniq.sort value_list = record_list.map do |rec| list = [] field_list.each {|field| list << ActiveRecord::Base.connection.quote(rec[field] || rec[field.to_sym]) } list end # If table has standard timestamps and they're not in the record list then add them to the record list time = ActiveRecord::Base.connection.quote(Time.now) for field_name in %w(created_at updated_at) if self.column_names.include?(field_name) && !(field_list.include?(field_name)) field_list << field_name value_list.each {|rec| rec << time } end end field_list.map! {|field| %Q["#{field}"] } return [field_list, value_list] end end