Ruby文件读取parallelisim

我有一个包含很多行的文件(比如10亿)。 脚本正在迭代所有这些行,以将它们与另一个数据集进行比较。

由于目前这是在1个线程/ 1核心上运行,我想知道我是否可以启动多个分叉,每个分别同时处理文件的一部分。

到目前为止,我想到的唯一解决方案是sed unix命令。 使用sed,可以读取文件的“切片”(第x行到第y行)。 因此,一些叉子可以处理相应seds的输出。 但问题是Ruby会先将整个sed输出加载到RAM中。

有没有比sed更好的解决方案,或者有没有办法将sed输出“流”到Ruby?

你所要求的并不会真正帮助你。

首先,要跳转到文件的第n行,首先必须读取文件的前一部分,以计算其中的换行符数。 例如:

 $ ruby -e '(1..10000000).each { |i| puts "This is line number #{i}"}' > large_file.txt $ du -h large_file.txt 266M large_file.txt $ purge # mac os x command - clears any in memory disk caches in use $ time sed -n -e "5000000p; 5000000q" large_file.txt This is line number 5000000 sed -n -e "5000000p; 5000000q" large_file.txt 0.52s user 0.13s system 28% cpu 2.305 total $ time sed -n -e "5000000p; 5000000q" large_file.txt This is line number 5000000 sed -n -e "5000000p; 5000000q" large_file.txt 0.49s user 0.05s system 99% cpu 0.542 total 

注意sed命令是如何不是即时的,它必须通读文件的初始部分来确定第500万行的位置。 这就是为什么第二次运行它对我来说要快得多 – 我的计算机将文件缓存到ram中。

即使您将其关闭(通过手动拆分文件),如果您经常在文件的不同部分或文件之间跳转以读取下一行,您的IO性能也会很差。


更好的是在单独的线程(或进程)上处理每个第n行。 这将允许使用多个cpu内核,但仍具有良好的IO性能。 这可以通过并行库轻松完成。

使用示例(我的计算机有4个核心):

 $ ruby -e '(1..10000000).each { |i| puts "This is line number #{i}"}' > large_file.txt # use a smaller file to speed up the tests $ time ruby -r parallel -e "Parallel.each(File.open('large_file.txt').each_line, in_processes: 4) { |line| puts line if (line * 10000) =~ /9999/ }" This is line number 9999 This is line number 19999 This is line number 29999 This is line number 39999 This is line number 49999 This is line number 59999 This is line number 69999 This is line number 79999 This is line number 89999 This is line number 99990 This is line number 99991 This is line number 99992 This is line number 99993 This is line number 99994 This is line number 99995 This is line number 99996 This is line number 99997 This is line number 99999 This is line number 99998 ruby -r parallel -e 55.84s user 10.73s system 400% cpu 16.613 total $ time ruby -r parallel -e "Parallel.each(File.open('large_file.txt').each_line, in_processes: 1) { |line| puts line if (line * 10000) =~ /9999/ }" This is line number 9999 This is line number 19999 This is line number 29999 This is line number 39999 This is line number 49999 This is line number 59999 This is line number 69999 This is line number 79999 This is line number 89999 This is line number 99990 This is line number 99991 This is line number 99992 This is line number 99993 This is line number 99994 This is line number 99995 This is line number 99996 This is line number 99997 This is line number 99998 This is line number 99999 ruby -r parallel -e 47.04s user 7.46s system 97% cpu 55.738 total 

第二个版本(使用4个进程)完成了原始时间的29.81%,快了近4倍。

您可以使用forkthreads来完成此threads 。 在这两种情况下,您都必须编写管理它们的内容,并确定需要多少个子进程,以及每个应该处理的文件行数。

因此,对于第一段代码,您需要扫描文件并确定它包含的行数。 如果您使用的是* nix或Mac OS,则可以使用以下命令执行此操作:

 lc = `wc -l path/to/file`.to_i 

或者只需打开文件并在读取行时递增计数器。 Ruby在这方面做得非常快,但在包含“60亿”行的文件中, wc可能是更好的选择:

 lc = 0 File.foreach('path/to/file') { lc += 1 } 

除以您要管理的子流程数量:

 NUM_OF_PROCESSES = 5 lines_per_process = lc/NUM_OF_PROCESSES 

然后启动您的流程,告诉他们从哪里开始处理,以及有多少行:

 require 'threads' children = [] 1.step(lc, lines_per_process) do |start_line| children << Thread.new do cur_line = 0 File.foreach('path/to/file') do |li| cur_line += 1 next unless (cur_line === start_line .. (start_line + lines_per_process) # ... do something with the lines read end end end # wait for them to finish children.each { |c| c.join } 

那是未经测试的,但我是从哪里开始的。