root/trunk/daemon/file_monitor.rb

Revision 1052, 14.5 kB (checked in by julians, 4 years ago)

Added migration for removing path column from files table; changed file_monitor so that path column is no longer set when new files are created

Line 
1 class FileMonitor
2   cattr_accessor :log_all_sql
3   cattr_accessor :console_writer
4
5   class ETAPrinter
6     def initialize(number_of_items)
7       @number_of_items = number_of_items
8       @items_completed = 0
9       @longest_output = 0
10       @last_eta_update = 0
11       @min_eta_update_delta = 1.seconds
12       @start = Time.new
13     end
14
15     def increment()
16       @items_completed += 1
17       now = Time.new
18       time_per_item = (now - @start) / @items_completed
19       items_remaining = @number_of_items - @items_completed
20       if items_remaining > 0
21         if @last_eta_update.to_i + @min_eta_update_delta <= now.to_i
22           @last_eta_update = now
23           time_remaining = items_remaining * time_per_item
24           eta_string = "ETA: #{(Time.local(2007) + (time_remaining)).strftime('%H:%M:%S')}s"
25           print(eta_string + "\010" * eta_string.size)
26           $stdout.flush
27           @longest_output = [@longest_output, eta_string.size].max
28         end
29       else
30         print(" " * @longest_output + "\010" * @longest_output)
31       end
32     end
33   end
34  
35   # Set this to true if you want to see the individual SQL commands
36   self.log_all_sql = false
37  
38   # TODO: Check that paths are not overlapping
39   def FileMonitor.start(paths, only_initial_update = false, force_update_time = nil)
40
41     server = Earth::Server.this_server
42     server.daemon_version = ApplicationHelper.earth_version
43     server.save!
44
45     at_exit { server = Earth::Server.this_server; server.daemon_version = nil; server.save! }
46
47     # Turn the paths into an absolute path
48     paths = paths.map{|p| File.expand_path(p)}
49    
50     # Find the current directory that it's watching
51     directories = Earth::Directory.roots_for_server(server)
52     previous_paths = directories.map{|d| d.path}
53    
54     start_heartbeat_thread
55    
56     paths_to_start_watching = paths - previous_paths
57     paths_to_stop_watching = previous_paths - paths
58    
59     if !paths_to_stop_watching.empty?
60       raise "To stop watching #{paths_to_stop_watching} clear out the database (with --clear)"
61     end
62    
63     directories.each do |directory|
64       benchmark "Collecting startup data for directory #{directory.path} from database" do
65         directory.load_all_children(0, { :include => [ :cached_sizes ] })
66         directory.cached_sizes.reload
67       end
68     end
69     paths_to_start_watching.each do |path|
70       benchmark "Doing initial pass on new path #{path}" do
71         directories << FileMonitor.initial_pass_on_new_directory(path)
72       end
73     end
74    
75     server = Earth::Server.this_server
76     server.last_update_finish_time = Time.new
77     server.save!
78    
79     run(directories, force_update_time) unless only_initial_update
80   end
81  
82   def FileMonitor.directory_saved(node)
83     @directory_eta_printer.increment if @directory_eta_printer
84   end
85  
86   # Remove all directories on this server from the database
87   def FileMonitor.database_cleanup
88     this_server = Earth::Server.this_server
89     benchmark "Clearing old data for this server out of the database" do
90       Earth::Directory.delete_all "server_id=#{this_server.id}"
91     end 
92     this_server.last_update_finish_time = nil
93     this_server.save!
94   end
95  
96 private
97
98   @@benchmark_output_enabled = true
99
100   def FileMonitor.silent_benchmark
101     prev_benchmark_output_enabled = @@benchmark_output_enabled
102     @@benchmark_output_enabled = false
103     yield
104     @@benchmark_output_enabled = prev_benchmark_output_enabled
105   end
106
107   def FileMonitor.benchmark(description = nil, show_pending = true)
108     if @@benchmark_output_enabled and description and show_pending
109       print "#{description} ... "
110       $stdout.flush
111     end
112     time_before = Time.new
113     result = yield
114     duration = Time.new - time_before
115     if @@benchmark_output_enabled and description
116       if not show_pending
117         print "#{description} "
118       end
119       puts "took #{duration}s"
120     end
121     result
122   end
123
124   def FileMonitor.start_heartbeat_thread
125     ActiveRecord::Base.allow_concurrency = true
126     puts "Starting heartbeat thread..."
127     Thread.new do
128       while true do
129         # reload the server object in case of changes on the database side
130         # TODO: There is a small chance that the server object will get stale here
131         server = Earth::Server.this_server
132         logger.debug("Heartbeat time:   About to update at time #{Time.now}")
133         server.heartbeat
134         logger.debug("Heartbeat time: Finished updating at time #{Time.now}")
135         sleep(server.heartbeat_interval)
136       end
137     end
138   end
139  
140   def FileMonitor.initial_pass_on_new_directory(name, parent = nil)
141     this_server = Earth::Server.this_server
142
143     benchmark "Scanning and storing tree", false do
144    
145       if parent
146         directory = parent.children.build(:name => name, :path => "#{parent.path}/#{name}", :server_id => this_server.id)
147       else
148         directory = this_server.directories.build(:name => name, :path => name)
149       end
150       directory_count = benchmark "Building initial directory structure for #{name}" do
151         update([directory], 0, :only_build_directories => true, :initial_pass => false, :show_eta => false)
152       end
153
154       benchmark "Committing initial directory structure for #{name} to database" do
155         @directory_eta_printer = ETAPrinter.new(directory_count) unless parent
156         Earth::Directory.add_save_observer(self) unless parent
157         Earth::Directory.cache_enabled = false
158         directory.save
159         Earth::Directory.cache_enabled = true
160         Earth::Directory.remove_save_observer(self) unless parent
161         @directory_eta_printer = nil
162       end
163
164       directory.reload
165       directory.load_all_children
166
167       benchmark "Initial pass at gathering all files beneath #{name}" do
168         update([directory], 0, :only_build_directories => false, :initial_pass => true, :show_eta => parent.nil?)
169       end
170
171       benchmark "Creating cache information" do
172         ActiveRecord::Base.logger.debug("begin create cache");
173         @cached_size_eta_printer = ETAPrinter.new(directory_count) unless parent
174         directory.create_caches_recursively(@cached_size_eta_printer)
175         @cached_size_eta_printer = nil
176         ActiveRecord::Base.logger.debug("end create cache");
177       end
178
179       #benchmark "Vacuuming database" do
180       #  Earth::File.connection.update("VACUUM FULL ANALYZE")
181       #end
182
183       directory.load_all_children(0, { :include => [ :cached_sizes ] })
184       directory.cached_sizes.reload
185
186       directory
187     end
188   end
189
190   def FileMonitor.run(directories, force_update_time=nil)
191     while true do
192       # At the beginning of every update get the server information in case it changes on the database
193       server = Earth::Server.this_server
194       update_time = force_update_time || server.update_interval
195       # Hmmm.. children_count doesn't include itself in the count
196       directory_count = directories.map{|d| d.children_count + 1}.sum
197       puts "Updating #{directory_count} directories over #{update_time}s..."
198       update(directories, update_time)     
199     end
200   end
201  
202   def FileMonitor.update(directories, update_time = 0, *args)
203     options = { :only_build_directories => false, :initial_pass => false, :show_eta => false }
204     options.update(args.first) if args.first
205     # TODO: Do this in a nicer way
206     total_count = 0
207     remaining_count = directories.map{|d| d.children_count + 1}.sum
208     eta_printer = ETAPrinter.new(remaining_count) if options[:show_eta]
209     start = Time.new
210     logger.debug("starting update cycle, directories.size is #{directories.size} remaining count is #{remaining_count}")
211     directories.each do |directory|
212       directory.each do |d|
213         total_count += update_non_recursive(d, options)
214         remaining_time = update_time - (Time.new - start)
215         if remaining_time > 0 && remaining_count > 0
216           sleep_time = remaining_time.to_f / remaining_count
217           sleep (sleep_time)
218         end
219         remaining_count -= 1
220  
221         eta_printer.increment if options[:show_eta]
222       end
223     end
224     stop = Time.new
225     logger.debug("Update cycle took #{stop - start}s, remaining_count is #{remaining_count}")
226
227     if not options[:only_build_directories] and not options[:initial_pass]
228       # Set the last_update_finish_time
229       server = Earth::Server.this_server
230       server.last_update_finish_time = Time.new
231       server.save!
232     end
233    
234     total_count
235   end
236
237   def FileMonitor.logger
238     RAILS_DEFAULT_LOGGER
239   end
240
241   def FileMonitor.update_non_recursive(directory, options)
242
243     directory_count = 1
244
245     begin
246       new_directory_stat = File.lstat(directory.path)
247     rescue Errno::ENOENT
248       # Handle case when the directory no longer exists
249       new_directory_stat = nil
250
251       logger.debug("update_non_recursive for directory #{directory.path} -> removed")
252     end
253    
254     # If directory hasn't changed then return
255     if new_directory_stat == directory.stat or \
256       (not new_directory_stat.nil? and new_directory_stat.mtime >= 1.seconds.ago)
257
258       if new_directory_stat.nil?
259         logger.debug("update_non_recursive for directory #{directory.path} -> just removed")
260       elsif new_directory_stat == directory.stat
261         logger.debug("update_non_recursive for directory #{directory.path} -> not changed")
262       else
263         logger.debug("update_non_recursive for directory #{directory.path} -> changed less than 1 second ago (#{new_directory_stat.mtime})")
264       end
265
266       if directory.cached_sizes.empty? and not directory.new_record?
267         Earth::Directory::transaction do
268           directory.create_caches
269           directory.update_caches
270           directory.cached_sizes.reload
271         end
272       end
273
274       return 1
275     end
276
277
278     Earth::Directory::transaction do
279
280       file_names, subdirectory_names, stats = [], [], Hash.new
281       if new_directory_stat && new_directory_stat.readable? && new_directory_stat.executable?
282         begin
283           file_names, subdirectory_names, stats = contents(directory)
284         rescue Errno::ENOENT
285           # It's possible that the directory was deleted before
286           # Dir.entries could be called. Therefore, ignore this
287           # exception and treat it like an unreadable directory
288         end
289       end
290
291       logger.debug("update_non_recursive for directory #{directory.path} -> changed, subdirectories are #{subdirectory_names.inspect}")
292
293       added_directory_names = subdirectory_names - directory.children.map{|x| x.name}
294       added_directory_names.each do |name|
295
296         Earth::Directory.benchmark("Creating directory #{directory.path}/#{name}", Logger::DEBUG, !log_all_sql) do
297           if options[:only_build_directories] then
298             attributes = { :name => name, :path => "#{directory.path}/#{name}", :server_id => directory.server_id }
299             dir = directory.children.build(attributes)
300             update_non_recursive(dir, options)
301           else
302             FileMonitor.silent_benchmark { initial_pass_on_new_directory(name, directory) }
303           end
304         end
305       end
306
307       if not options[:only_build_directories] then
308         # By adding and removing files on the association, the cache of the association will be kept up to date
309         if not options[:initial_pass]
310           added_file_names = file_names - directory.files.map{|x| x.name}
311         else
312           added_file_names = file_names
313         end
314         added_file_names.each do |name|
315           Earth::File.benchmark("Creating file with name #{name}", Logger::DEBUG, !log_all_sql) do
316             directory.files.create(:name => name, :stat => stats[name])
317           end
318         end
319
320         if not options[:initial_pass]
321           directory_files = directory.files.to_ary.clone
322          
323           directory_files.each do |file|
324             # If the file still exists
325             if file_names.include?(file.name)
326               logger.debug("checking for update on file #{file.name}")
327               # If the file has changed
328               if file.stat != stats[file.name]
329                 file.stat = stats[file.name]
330                 Earth::File.benchmark("Updating file with name #{file.name}", Logger::DEBUG, !log_all_sql) do
331                   file.save
332                 end
333               end
334               # If the file has been deleted
335             else
336               Earth::Directory.benchmark("Removing file with name #{file.name}", Logger::DEBUG, !log_all_sql) do
337                 directory.files.delete(file)
338               end
339             end
340           end
341         end
342       end
343      
344       directory_children = directory.children.to_ary.clone
345
346       directory_children.each do |dir|
347         # If the directory has been deleted
348         if !subdirectory_names.include?(dir.name)
349          
350           Earth::Directory.benchmark("Removing directory with name #{dir.name}", Logger::DEBUG, !log_all_sql) do
351             directory.child_delete(dir)
352           end
353         end
354       end
355      
356       # Update the directory stat information at the end
357       if not options[:only_build_directories]
358         if File.exist?(directory.path) # FIXME - why is this checked again? can this lead to database inconsistency wrt recursive sizes?
359           directory.stat = new_directory_stat
360
361           # This will not overwrite 'lft' and 'rgt' so it doesn't matter if these are out of date
362           Earth::Directory.benchmark("Updating directory with name #{directory.name}", Logger::DEBUG, !log_all_sql) do
363             directory.update
364           end
365
366         end
367       end
368     end
369    
370     # Removes the files in this directory from the cache (so that they don't take up memory)
371     # However, they will get reloaded automatically from the database the next time this
372     # directory changes
373     directory.files.reset
374
375     directory_count
376   end
377  
378   def FileMonitor.contents(directory)
379     entries = Dir.entries(directory.path)
380     # Ignore ".' and ".." directories
381     entries.delete(".")
382     entries.delete("..")
383    
384     # Quote names that can not be converted to UTF8
385     quote = QuoteBadCharacters.new
386     entries.map!{|text| quote.quote(text)}
387    
388     # Contains the stat information for both files and directories
389     stats = Hash.new
390     entries.each do |file|
391       begin
392         stats[file] = File.lstat(File.join(directory.path, file))
393       rescue Errno::ENOENT
394         # It's possible that the file was deleted after the call to
395         # Dir.entries and prior to the invocation of
396         # File.lstat. Therefore, ignore this exception.
397       end
398     end
399    
400     # Seperately test for whether it's a file or a directory because it could
401     # be something like a symbolic link (which we shouldn't follow)
402     file_names = entries.select{|x| stats.keys.include?(x) and stats[x].file?}
403     subdirectory_names = entries.select{|x| stats.keys.include?(x) and stats[x].directory?}
404    
405     return file_names, subdirectory_names, stats
406   end
407 end
Note: See TracBrowser for help on using the browser.