Skip to content

Commit

Permalink
Merge pull request #78 from guyboertje/fix/76
Browse files Browse the repository at this point in the history
fix 76 - change the way that target is created; rework response to quit
  • Loading branch information
Guy Boertje committed Mar 9, 2016
2 parents cf60cb4 + 5ad3848 commit 703862a
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 121 deletions.
2 changes: 1 addition & 1 deletion filewatch.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Gem::Specification.new do |spec|
end

spec.name = "filewatch"
spec.version = "0.8.0"
spec.version = "0.8.1"
spec.summary = "filewatch - file watching for ruby"
spec.description = "Watch files and directories in ruby. Also supports tailing and glob file patterns."
spec.files = files
Expand Down
2 changes: 2 additions & 0 deletions lib/filewatch/observing_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def subscribe(observer = NullObserver.new)
@logger.warn("unknown event type #{event} for #{path}")
end
end # @watch.subscribe
# when watch.subscribe ends - its because we got quit
_sincedb_write
end # def subscribe

private
Expand Down
8 changes: 3 additions & 5 deletions lib/filewatch/tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ class Tail
attr_writer :target

def self.new_observing(opts = {})
new.tap do |instance|
instance.target = ObservingTail.new(opts)
end
new({}, ObservingTail.new(opts))
end

def initialize(opts = {})
@target = YieldingTail.new(opts)
def initialize(opts = {}, target = YieldingTail.new(opts))
@target = target
end
end
end
2 changes: 0 additions & 2 deletions lib/filewatch/tail_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ def _sincedb_write
# before the instance is disposed of.
def quit
@watch.quit # <-- should close all the files
# and that should allow the sincedb_write to succeed if it could not before
_sincedb_write
end # def quit

public
Expand Down
238 changes: 129 additions & 109 deletions lib/filewatch/watch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,127 +117,147 @@ def inode(path, stat)
def each(&block)
synchronized do
return if @files.empty?

file_deleteable = []
# creates this array just once
watched_files = @files.values

# look at the closed to see if its changed
watched_files.select {|wf| wf.closed? }.each do |watched_file|
path = watched_file.path
begin
stat = watched_file.restat
if watched_file.size_changed? || watched_file.inode_changed?(inode(path,stat))
# if the closed file changed, move it to the watched state
# not to active state because we want to use MAX_OPEN_FILES throttling.
watched_file.watch
begin
file_deletable = []
# creates this array just once
watched_files = @files.values

# look at the closed to see if its changed
watched_files.select {|wf| wf.closed? }.each do |watched_file|
path = watched_file.path
break if quit?
begin
stat = watched_file.restat
if watched_file.size_changed? || watched_file.inode_changed?(inode(path,stat))
# if the closed file changed, move it to the watched state
# not to active state because we want to use MAX_OPEN_FILES throttling.
watched_file.watch
end
rescue Errno::ENOENT
# file has gone away or we can't read it anymore.
file_deletable << path
@logger.debug? && @logger.debug("each: closed?: stat failed: #{path}: (#{$!}), deleting from @files")
rescue => e
@logger.error("each: closed?: #{path}: (#{e.inspect})")
end
rescue Errno::ENOENT
# file has gone away or we can't read it anymore.
file_deleteable << path
@logger.debug? && @logger.debug("each: closed: stat failed: #{path}: (#{$!}), deleting from @files")
rescue => e
@logger.debug? && @logger.debug("each: closed: stat failed: #{path}: (#{e.inspect})")
end
end

# look at the ignored to see if its changed
watched_files.select {|wf| wf.ignored? }.each do |watched_file|
path = watched_file.path
begin
stat = watched_file.restat
if watched_file.size_changed? || watched_file.inode_changed?(inode(path,stat))
# if the ignored file changed, move it to the watched state
# not to active state because we want to use MAX_OPEN_FILES throttling.
# this file has not been yielded to the block yet
# but we must have the tail to start from the end, so when the file
# was first ignored we updated the bytes_read to the stat.size at that time.
# by adding this to the sincedb so that the subsequent modify
# event can detect the change
watched_file.watch
yield(:unignore, watched_file)
return if quit?

# look at the ignored to see if its changed
watched_files.select {|wf| wf.ignored? }.each do |watched_file|
path = watched_file.path
break if quit?
begin
stat = watched_file.restat
if watched_file.size_changed? || watched_file.inode_changed?(inode(path,stat))
# if the ignored file changed, move it to the watched state
# not to active state because we want to use MAX_OPEN_FILES throttling.
# this file has not been yielded to the block yet
# but we must have the tail to start from the end, so when the file
# was first ignored we updated the bytes_read to the stat.size at that time.
# by adding this to the sincedb so that the subsequent modify
# event can detect the change
watched_file.watch
yield(:unignore, watched_file)
end
rescue Errno::ENOENT
# file has gone away or we can't read it anymore.
file_deletable << path
@logger.debug? && @logger.debug("each: ignored: stat failed: #{path}: (#{$!}), deleting from @files")
rescue => e
@logger.error("each: ignored?: #{path}: (#{e.inspect})")
end
rescue Errno::ENOENT
# file has gone away or we can't read it anymore.
file_deleteable << path
@logger.debug? && @logger.debug("each: ignored: stat failed: #{path}: (#{$!}), deleting from @files")
rescue => e
@logger.debug? && @logger.debug("each: ignored: stat failed: #{path}: (#{e.inspect})")
end
end

# Send any creates.
if (to_take = @max_active - watched_files.count{|wf| wf.active?}) > 0
watched_files.select {|wf| wf.watched? }.take(to_take).each do |watched_file|
watched_file.activate
# don't do create again
next if watched_file.state_history_any?(:closed, :ignored)
# if the file can't be opened during the yield
# its state is set back to watched
if watched_file.initial?
yield(:create_initial, watched_file)
else
yield(:create, watched_file)
return if quit?

# Send any creates.
if (to_take = @max_active - watched_files.count{|wf| wf.active?}) > 0
watched_files.select {|wf| wf.watched? }.take(to_take).each do |watched_file|
break if quit?
path = watched_file.path
begin
stat = watched_file.restat
watched_file.activate
# don't do create again
next if watched_file.state_history_any?(:closed, :ignored)
# if the file can't be opened during the yield
# its state is set back to watched
sym = watched_file.initial? ? :create_initial : :create
yield(sym, watched_file)
rescue Errno::ENOENT
# file has gone away or we can't read it anymore.
file_deletable << path
watched_file.unwatch
yield(:delete, watched_file)
next
@logger.debug? && @logger.debug("each: closed: stat failed: #{path}: (#{$!}), deleting from @files")
rescue => e
@logger.error("each: watched?: #{path}: (#{e.inspect})")
end
end
end
else
now = Time.now.to_i
if (now - @lastwarn_max_files) > MAX_FILES_WARN_INTERVAL
waiting = @files.size - @max_active
specific = if @close_older.nil?
", try setting close_older. There are #{waiting} unopened files"
else
", files yet to open: #{waiting}"
else
now = Time.now.to_i
if (now - @lastwarn_max_files) > MAX_FILES_WARN_INTERVAL
waiting = @files.size - @max_active
specific = if @close_older.nil?
", try setting close_older. There are #{waiting} unopened files"
else
", files yet to open: #{waiting}"
end
@logger.warn(@max_warn_msg + specific)
@lastwarn_max_files = now
end
@logger.warn(@max_warn_msg + specific)
@lastwarn_max_files = now
end
end

# wf.active means the actual files were opened
# and have been read once - unless they were empty at the time
watched_files.select {|wf| wf.active? }.each do |watched_file|
path = watched_file.path
begin
stat = watched_file.restat
rescue Errno::ENOENT
# file has gone away or we can't read it anymore.
file_deleteable << path
@logger.debug? && @logger.debug("each: active: stat failed: #{path}: (#{$!}), deleting from @files")
watched_file.unwatch
yield(:delete, watched_file)
next
rescue => e
@logger.debug? && @logger.debug("each: active: stat failed: #{path}: (#{e.inspect})")
next
end
return if quit?

# wf.active means the actual files were opened
# and have been read once - unless they were empty at the time
watched_files.select {|wf| wf.active? }.each do |watched_file|
path = watched_file.path
break if quit?
begin
stat = watched_file.restat
rescue Errno::ENOENT
# file has gone away or we can't read it anymore.
file_deletable << path
@logger.debug? && @logger.debug("each: active: stat failed: #{path}: (#{$!}), deleting from @files")
watched_file.unwatch
yield(:delete, watched_file)
next
rescue => e
@logger.error("each: active?: #{path}: (#{e.inspect})")
next
end

if watched_file.file_closable?
@logger.debug? && @logger.debug("each: active: file expired: #{path}")
yield(:timeout, watched_file)
watched_file.close
next
end
if watched_file.file_closable?
@logger.debug? && @logger.debug("each: active: file expired: #{path}")
yield(:timeout, watched_file)
watched_file.close
next
end

_inode = inode(path,stat)
read_thus_far = watched_file.bytes_read
# we don't update the size here, its updated when we actually read
if watched_file.inode_changed?(_inode)
@logger.debug? && @logger.debug("each: new inode: #{path}: old inode was #{watched_file.inode.inspect}, new is #{_inode.inspect}")
watched_file.update_inode(_inode)
yield(:delete, watched_file)
yield(:create, watched_file)
elsif stat.size < read_thus_far
@logger.debug? && @logger.debug("each: file rolled: #{path}: new size is #{stat.size}, old size #{read_thus_far}")
yield(:delete, watched_file)
yield(:create, watched_file)
elsif stat.size > read_thus_far
@logger.debug? && @logger.debug("each: file grew: #{path}: old size #{read_thus_far}, new size #{stat.size}")
yield(:modify, watched_file)
_inode = inode(path,stat)
read_thus_far = watched_file.bytes_read
# we don't update the size here, its updated when we actually read
if watched_file.inode_changed?(_inode)
@logger.debug? && @logger.debug("each: new inode: #{path}: old inode was #{watched_file.inode.inspect}, new is #{_inode.inspect}")
watched_file.update_inode(_inode)
yield(:delete, watched_file)
yield(:create, watched_file)
elsif stat.size < read_thus_far
@logger.debug? && @logger.debug("each: file rolled: #{path}: new size is #{stat.size}, old size #{read_thus_far}")
yield(:delete, watched_file)
yield(:create, watched_file)
elsif stat.size > read_thus_far
@logger.debug? && @logger.debug("each: file grew: #{path}: old size #{read_thus_far}, new size #{stat.size}")
yield(:modify, watched_file)
end
end
ensure
file_deletable.each {|f| @files.delete(f)}
end

file_deleteable.each {|f| @files.delete(f)}
end
end # def each

Expand All @@ -258,7 +278,7 @@ def subscribe(stat_interval = 1, discover_interval = 5, &block)
reset_quit
while !quit?
each(&block)

break if quit?
glob += 1
if glob == discover_interval
discover
Expand Down
2 changes: 2 additions & 0 deletions lib/filewatch/yielding_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def subscribe(&block)
@logger.warn("unknown event type #{event} for #{path}")
end
end # @watch.subscribe
# when watch.subscribe ends - its because we got quit
_sincedb_write
end # def subscribe

private
Expand Down
24 changes: 24 additions & 0 deletions spec/observe_tail_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,30 @@
FileUtils.rm_rf(sincedb_path)
end

let(:_cache) { [] }

context "when sincedb path is given but ENV[\"HOME\"] is not given" do
before { _cache << ENV.delete("HOME") }
after { _cache.first.tap{|s| ENV["HOME"] = s unless s.nil?} }

it "should not raise an exception" do
expect do
FileWatch::Tail.new_observing(:sincedb_path => sincedb_path, :stat_interval => 0.05)
end.not_to raise_error
end
end

context "when sincedb path is given but ENV[\"SINCEDB_PATH\"] is not given" do
before { _cache << ENV.delete("SINCEDB_PATH") }
after { _cache.first.tap{|s| ENV["SINCEDB_PATH"] = s unless s.nil?} }

it "should not raise an exception" do
expect do
FileWatch::Tail.new_observing(:sincedb_path => sincedb_path, :stat_interval => 0.05)
end.not_to raise_error
end
end

context "when watching before files exist (start at end)" do
subject { FileWatch::Tail.new_observing(
:sincedb_path => sincedb_path, :stat_interval => 0.05) }
Expand Down
Loading

0 comments on commit 703862a

Please sign in to comment.