---- processor在tail_mode->processor.rb defprocess_all_states(watched_files) process_closed(watched_files) returnif watch.quit? process_ignored(watched_files) returnif watch.quit? process_delayed_delete(watched_files) returnif watch.quit? process_restat_for_watched_and_active(watched_files) returnif watch.quit? process_rotation_in_progress(watched_files) returnif watch.quit? process_watched(watched_files) returnif watch.quit? process_active(watched_files) end 。。。 defprocess_watched(watched_files) logger.trace("Watched processing") to_take = @settings.max_active - watched_files.count{|wf| wf.active?} if to_take > 0 watched_files.select {|wf| wf.watched?}.take(to_take).each do |watched_file| watched_file.activate if watched_file.initial? create_initial(watched_file) else create(watched_file) end breakif watch.quit? end else now = Time.now.to_i if (now - watch.lastwarn_max_files) > MAX_FILES_WARN_INTERVAL waiting = watched_files.size - @settings.max_active logger.warn(@settings.max_warn_msg + ", files yet to open: #{waiting}") watch.lastwarn_max_files = now end end end 。。。 defcreate_initial(watched_file) @create_initial.handle(watched_file) end
------discove 入口 discover.rb defdiscover_any_files(path, ongoing) fileset = Dir.glob(path).select{|f| File.file?(f)} logger.trace("discover_files", "count" => fileset.size) logger.warn("discover_files", "count" => fileset.size) fileset.each do |file| pathname = Pathname.new(file) new_discovery = false watched_file = @watched_files_collection.watched_file_by_path(file) if watched_file.nil? begin path_stat = PathStatClass.new(pathname) rescueErrno::ENOENT next end watched_file = WatchedFile.new(pathname, path_stat, @settings) new_discovery = true logger.info("discover_files handling:", "new:"=> new_discovery, "watched_file:" => watched_file.details) end # if it already unwatched or its excluded then we can skip nextif watched_file.unwatched? || can_exclude?(watched_file, new_discovery) logger.trace("discover_files handling:", "new discovery"=> new_discovery, "watched_file details" => watched_file.details) if new_discovery watched_file.initial_completed if ongoing # initially when the sincedb collection is filled with records from the persistence file # each value is not associated with a watched file # a sincedb_value can be: # unassociated # associated with this watched_file # associated with a different watched_file if@sincedb_collection.associate(watched_file) if watched_file.file_ignorable? logger.trace("Discoverer discover_files: #{file}: skipping because it was last modified more than #{@settings.ignore_older} seconds ago") logger.info("Discoverer discover_files: #{file}: skipping because it was last modified more than #{@settings.ignore_older} seconds ago") # on discovery ignorable watched_files are put into the ignored state and that # updates the size from the internal stat # so the existing contents are not read. # because, normally, a newly discovered file will # have a watched_file size of zero # they are still added to the collection so we know they are there for the next periodic discovery watched_file.ignore_as_unread end # now add the discovered file to the watched_files collection and adjust the sincedb collections @watched_files_collection.add(watched_file) end end # at this point the watched file is created, is in the db but not yet opened or being processed end end
------最终发现重命名的inode文件调用是在 tail_mode->handlers->base.rb base.rb defhandle(watched_file) logger.trace("handling: #{watched_file.filename}") logger.info("handling: #{watched_file.filename}") unless watched_file.has_listener? watched_file.set_listener(@observer) end handle_specifically(watched_file) end
defget_new_value_specifically(watched_file) position = watched_file.position_for_new_sincedb_value value = SincedbValue.new(position) value.set_watched_file(watched_file) watched_file.update_bytes_read(position) value end ---> creat_initial.rb moduleFileWatchmoduleTailModemoduleHandlers classCreateInitial < Base defhandle_specifically(watched_file) if open_file(watched_file) logger.trace("handle_specifically opened file handle: #{watched_file.file.fileno}, path: #{watched_file.filename}") logger.info("handle_specifically opened file handle: #{watched_file.file.fileno}, path: #{watched_file.filename}") add_or_update_sincedb_collection(watched_file) end end
defupdate_existing_specifically(watched_file, sincedb_value) position = watched_file.last_stat_size if@settings.start_new_files_at == :beginning position = 0 end logger.trace("update_existing_specifically - #{watched_file.path}: seeking to #{position}") logger.info("update_existing_specifically - #{watched_file.path}: seeking to #{position}") watched_file.update_bytes_read(position) sincedb_value.update_position(position) end end endendend ---> base.rb defadd_or_update_sincedb_collection(watched_file) sincedb_value = @sincedb_collection.find(watched_file) if sincedb_value.nil? sincedb_value = add_new_value_sincedb_collection(watched_file) watched_file.initial_completed elsif sincedb_value.watched_file == watched_file update_existing_sincedb_collection_value(watched_file, sincedb_value) watched_file.initial_completed else msg = "add_or_update_sincedb_collection: found sincedb record" logger.trace(msg, "sincedb key" => watched_file.sincedb_key, "sincedb value" => sincedb_value ) # detected a rotation, Discoverer can't handle this because this watched file is not a new discovery. # we must handle it here, by transferring state and have the sincedb value track this watched file # rotate_as_file and rotate_from will switch the sincedb key to the inode that the path is now pointing to # and pickup the sincedb_value from before. msg = "add_or_update_sincedb_collection: the found sincedb_value has a watched_file - this is a rename, switching inode to this watched file" logger.trace(msg) existing_watched_file = sincedb_value.watched_file if existing_watched_file.nil? sincedb_value.set_watched_file(watched_file) logger.trace("add_or_update_sincedb_collection: switching as new file") watched_file.rotate_as_file watched_file.update_bytes_read(sincedb_value.position) else sincedb_value.set_watched_file(watched_file) logger.trace("add_or_update_sincedb_collection: switching from...", "watched_file details" => watched_file.details) watched_file.rotate_from(existing_watched_file) end end sincedb_value end
watched_file.rb--> defposition_for_new_sincedb_value if@initial # this file was found in first discovery @settings.start_new_files_at == :beginning ? 0 : last_stat_size else # always start at the beginning if found after first discovery 0 end end