inotify_tracker.go 5.9 KB
// Copyright (c) 2015 HPE Software Inc. All rights reserved.
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.

package watch

import (
	"log"
	"os"
	"path/filepath"
	"sync"
	"syscall"

	"github.com/nxadm/tail/util"

    "github.com/fsnotify/fsnotify"
)

type InotifyTracker struct {
	mux       sync.Mutex
	watcher   *fsnotify.Watcher
	chans     map[string]chan fsnotify.Event
	done      map[string]chan bool
	watchNums map[string]int
	watch     chan *watchInfo
	remove    chan *watchInfo
	error     chan error
}

type watchInfo struct {
	op    fsnotify.Op
	fname string
}

func (this *watchInfo) isCreate() bool {
	return this.op == fsnotify.Create
}

var (
	// globally shared InotifyTracker; ensures only one fsnotify.Watcher is used
	shared *InotifyTracker

	// these are used to ensure the shared InotifyTracker is run exactly once
	once  = sync.Once{}
	goRun = func() {
		shared = &InotifyTracker{
			mux:       sync.Mutex{},
			chans:     make(map[string]chan fsnotify.Event),
			done:      make(map[string]chan bool),
			watchNums: make(map[string]int),
			watch:     make(chan *watchInfo),
			remove:    make(chan *watchInfo),
			error:     make(chan error),
		}
		go shared.run()
	}

	logger = log.New(os.Stderr, "", log.LstdFlags)
)

// Watch signals the run goroutine to begin watching the input filename
func Watch(fname string) error {
	return watch(&watchInfo{
		fname: fname,
	})
}

// Watch create signals the run goroutine to begin watching the input filename
// if call the WatchCreate function, don't call the Cleanup, call the RemoveWatchCreate
func WatchCreate(fname string) error {
	return watch(&watchInfo{
		op:    fsnotify.Create,
		fname: fname,
	})
}

func watch(winfo *watchInfo) error {
	// start running the shared InotifyTracker if not already running
	once.Do(goRun)

	winfo.fname = filepath.Clean(winfo.fname)
	shared.watch <- winfo
	return <-shared.error
}

// RemoveWatch signals the run goroutine to remove the watch for the input filename
func RemoveWatch(fname string) error {
	return remove(&watchInfo{
		fname: fname,
	})
}

// RemoveWatch create signals the run goroutine to remove the watch for the input filename
func RemoveWatchCreate(fname string) error {
	return remove(&watchInfo{
		op:    fsnotify.Create,
		fname: fname,
	})
}

func remove(winfo *watchInfo) error {
	// start running the shared InotifyTracker if not already running
	once.Do(goRun)

	winfo.fname = filepath.Clean(winfo.fname)
	shared.mux.Lock()
	done := shared.done[winfo.fname]
	if done != nil {
		delete(shared.done, winfo.fname)
		close(done)
	}
	shared.mux.Unlock()

	shared.remove <- winfo
	return <-shared.error
}

// Events returns a channel to which FileEvents corresponding to the input filename
// will be sent. This channel will be closed when removeWatch is called on this
// filename.
func Events(fname string) <-chan fsnotify.Event {
	shared.mux.Lock()
	defer shared.mux.Unlock()

	return shared.chans[fname]
}

// Cleanup removes the watch for the input filename if necessary.
func Cleanup(fname string) error {
	return RemoveWatch(fname)
}

// watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating
// a new Watcher if the previous Watcher was closed.
func (shared *InotifyTracker) addWatch(winfo *watchInfo) error {
	shared.mux.Lock()
	defer shared.mux.Unlock()

	if shared.chans[winfo.fname] == nil {
		shared.chans[winfo.fname] = make(chan fsnotify.Event)
	}
	if shared.done[winfo.fname] == nil {
		shared.done[winfo.fname] = make(chan bool)
	}

	fname := winfo.fname
	if winfo.isCreate() {
		// Watch for new files to be created in the parent directory.
		fname = filepath.Dir(fname)
	}

	var err error
	// already in inotify watch
	if shared.watchNums[fname] == 0 {
		err = shared.watcher.Add(fname)
	}
	if err == nil {
		shared.watchNums[fname]++
	}
	return err
}

// removeWatch calls fsnotify.RemoveWatch for the input filename and closes the
// corresponding events channel.
func (shared *InotifyTracker) removeWatch(winfo *watchInfo) error {
	shared.mux.Lock()

	ch := shared.chans[winfo.fname]
	if ch != nil {
		delete(shared.chans, winfo.fname)
		close(ch)
	}

	fname := winfo.fname
	if winfo.isCreate() {
		// Watch for new files to be created in the parent directory.
		fname = filepath.Dir(fname)
	}
	shared.watchNums[fname]--
	watchNum := shared.watchNums[fname]
	if watchNum == 0 {
		delete(shared.watchNums, fname)
	}
	shared.mux.Unlock()

	var err error
	// If we were the last ones to watch this file, unsubscribe from inotify.
	// This needs to happen after releasing the lock because fsnotify waits
	// synchronously for the kernel to acknowledge the removal of the watch
	// for this file, which causes us to deadlock if we still held the lock.
	if watchNum == 0 {
		err = shared.watcher.Remove(fname)
	}

	return err
}

// sendEvent sends the input event to the appropriate Tail.
func (shared *InotifyTracker) sendEvent(event fsnotify.Event) {
	name := filepath.Clean(event.Name)

	shared.mux.Lock()
	ch := shared.chans[name]
	done := shared.done[name]
	shared.mux.Unlock()

	if ch != nil && done != nil {
		select {
		case ch <- event:
		case <-done:
		}
	}
}

// run starts the goroutine in which the shared struct reads events from its
// Watcher's Event channel and sends the events to the appropriate Tail.
func (shared *InotifyTracker) run() {
	watcher, err := fsnotify.NewWatcher()
	if err != nil {
		util.Fatal("failed to create Watcher")
	}
	shared.watcher = watcher

	for {
		select {
		case winfo := <-shared.watch:
			shared.error <- shared.addWatch(winfo)

		case winfo := <-shared.remove:
			shared.error <- shared.removeWatch(winfo)

		case event, open := <-shared.watcher.Events:
			if !open {
				return
			}
			shared.sendEvent(event)

		case err, open := <-shared.watcher.Errors:
			if !open {
				return
			} else if err != nil {
				sysErr, ok := err.(*os.SyscallError)
				if !ok || sysErr.Err != syscall.EINTR {
					logger.Printf("Error in Watcher Error channel: %s", err)
				}
			}
		}
	}
}