view feed-push @ 2:292896092ee6 draft

chmod +x rc.d/feed-push Signed-off-by: Changaco <changaco ατ changaco δοτ net>
author Changaco <changaco ατ changaco δοτ net>
date Mon, 16 Apr 2012 00:09:37 +0200
parents a68d7feeba88
children 455cd8c78862
line wrap: on
line source

#!/usr/bin/env python2
# -*- coding: utf-8 -*-

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import argparse
import calendar
from   functools  import partial, reduce
from   glob       import glob
import json
import os
from   os.path    import abspath
import shlex
from   subprocess import Popen, PIPE, STDOUT
import sys
from   syslog     import *
import time

import feedparser
import gamin


# Constants

log_levels = ['DEBUG', 'INFO', 'NOTICE', 'WARNING', 'ERR', 'CRIT', 'ALERT', 'EMERG']

gamin_events = {
    1:'GAMChanged', 2:'GAMDeleted', 3:'GAMStartExecuting', 4:'GAMStopExecuting',
    5:'GAMCreated', 6:'GAMMoved', 7:'GAMAcknowledge', 8:'GAMExists', 9:'GAMEndExist'
}


# Generic utils

concat = lambda l: reduce(list.__add__, l, [])

def dict_append(d, k, v):
    d.setdefault(k, [])
    d[k].append(v)


# Logging

def log(*args):
    if len(args) == 1:
        priority, msg = LOG_INFO, args[0]
    elif len(args) == 2:
        priority, msg = args
    else:
        return
    priority = 7 - priority
    if priority < global_args.log_level:
        return
    if global_args.syslog:
        syslog(*args)
    else:
        sys.stderr.write(log_levels[priority]+': '+msg+'\n')

def ignore_event(path, event):
    log(LOG_DEBUG, 'ignoring event '+gamin_events.get(event, str(event))+' on '+path)


# Config parsing

def parse_config_file(config_fd):
    feeds_paths = config_to_feed_paths_to_commands[config_fd.name] = {}
    cmd = None
    next_cmd = []
    config_fd.seek(0)
    for i, line in enumerate(config_fd):
        line = line.strip()
        if len(line) == 0 or line[0] == '#':
            continue
        if line[0] == '%':
            next_cmd.append(line[1:].rstrip(';'))
        elif not next_cmd:
            log(LOG_ERR, 'missing command in file '+config_fd.name+' before line '+str(i))
            return
        else:
            cmd = '; '.join(next_cmd)
            next_cmd = []
            for feed_path in glob(line):
                feed_path = abspath(feed_path)
                dict_append(feeds_paths, feed_path, cmd)
                if not feed_path in path_to_feed_fd:
                    monitor.watch_file(feed_path, handle_feed_change)
    log('successfully parsed config file '+config_fd.name)


# Gamin callbacks

def handle_config_change(path, event):
    path = abspath(path)
    if os.path.isdir(path):
        ignore_event(path, event)
    elif not path in path_to_config_fd:
        open_config(path, event)
    elif event in [gamin.GAMChanged, gamin.GAMDeleted]:
        update_config(path, event)
    else:
        ignore_event(path, event)

def open_config(path, event):
    if event in [gamin.GAMCreated, gamin.GAMExists]:
        if (not path.endswith('.conf') or path[0] == '.') and not hasattr(global_args.config, 'read'):
            return log('ignoring '+path+' (not a valid config file name)')
        try:
            config_fd = open(path)
        except IOError as e:
            return log('failed to open "'+line+'" '+str(e))
        path_to_config_fd[path] = config_fd
        parse_config_file(config_fd)
    else:
        ignore_event(path, event)

def update_config(path, event):
    feeds_paths = set(concat(config_to_feed_paths_to_commands.values()))
    if event == gamin.GAMChanged:
        log('updating actions from modified config file '+config_fd.name)
        parse_config_file(path_to_config_fd[path])
    elif event == gamin.GAMDeleted:
        log('removing actions from deleted config file '+config_fd.name)
        config_to_feed_paths_to_commands.pop(path)
        path_to_config_fd.pop(path).close()
    new_feeds_paths = set(concat(config_to_feed_paths_to_commands.values()))
    for feed_path in feeds_paths.difference(new_feeds_paths):
        monitor.stop_watch(feed_path)
        if feed_path in path_to_feed_fd:
            path_to_feed_fd.pop(feed_path).close()

def handle_feed_change(path, event):
    if path not in path_to_feed_fd:
        if event in [gamin.GAMCreated, gamin.GAMExists, gamin.GAMChanged]:
            try:
                feed_fd = path_to_feed_fd[path] = open(path)
            except IOError as e:
                return log('failed to open "'+path+'": '+str(e))
            feed = feedparser.parse(feed_fd.read())
            handle_feed_change(path, gamin.GAMChanged)
        else:
            ignore_event(path, event)
    elif event == gamin.GAMChanged:
        feed_fd = path_to_feed_fd[path]
        feed_fd.seek(0)
        feed = feedparser.parse(feed_fd.read())
        for entry in reversed(feed.entries):
            if entry.id in state['id_cache'].get(feed_fd.name, []) or \
               not global_args.flood and calendar.timegm(entry.published_parsed) < time.time() - 86400:
                continue
            for feed_path_to_commands in config_to_feed_paths_to_commands.values():
                for cmd in feed_path_to_commands.get(path, []):
                    run_command(format_cmd(cmd, feed=feed.feed, entry=entry), entry.content[0].value)
        state['id_cache'][feed_fd.name] = [entry.id for entry in feed.entries]
        save_state()
    elif event == gamin.GAMDeleted:
        path_to_feed_fd.pop(path).close()
    else:
        ignore_event(path, event)

def save_state():
    global_args.state_file.truncate(0)
    json.dump(state, global_args.state_file)
    global_args.state_file.flush()


# Commands utils

def format_cmd(cmd, **kwargs):
    """The safe equivalent of str.format() for shell commands, meaning interpolated variables can't do shell injections (I hope)."""
    r = u''
    for arg in shlex.split(cmd.encode('utf8')):
        a = arg.decode('utf8')
        b = a.format(**kwargs)
        if a != b:
            r += u" '" + b.replace(u"'", u'\'"\'"\'') + u"'"
        else:
            r += u' ' + arg
    return r.lstrip()

def run_command(cmd, input):
    p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, shell=True)
    output = p.communicate(input.encode('utf8'))[0].decode('utf8')
    if p.returncode != 0:
        log(LOG_ERR, 'command failed: '+cmd+'\n'+output)
    else:
        log(LOG_INFO, 'successfully executed '+cmd)
        log(LOG_DEBUG, '===== output:\n'+output)


# Argparse utils

def AbsPath(next_type=None):
    def f(s):
        p = abspath(s)
        if next_type is not None:
            return next_type(p)
        else:
            return p
    return f

class Apply(argparse.Action):
    def __init__(self, f, *args, **kwargs):
        super(self.__class__, self).__init__(**kwargs)
        self.f = f
    def __call__(self, parser, namespace, values, option_string=None):
        setattr(namespace, self.dest, self.f(values[0]))

def Directory(s):
    try:
        os.listdir(s)
        return s
    except OSError as e:
        raise argparse.ArgumentTypeError(str(e))

def File(flags):
    def f(s):
        try:
            return os.fdopen(os.open(s, flags), 'w')
        except OSError as e:
            raise argparse.ArgumentTypeError(str(e))
    return f

class First(argparse.Action):
    def __call__(self, parser, namespace, values, option_string=None):
        setattr(namespace, self.dest, values[0])

def FirstOf(*types, **kwargs):
    kwargs.setdefault('error', 'argument "{}" is not valid')
    def f(s):
        for t in types:
            try:
                return t(s)
            except:
                pass
        raise argparse.ArgumentTypeError(error.format(s))
    return f


# Main

if __name__ == '__main__':

    p = argparse.ArgumentParser()
    p.add_argument('config', type=FirstOf(AbsPath(argparse.FileType('r')), AbsPath(Directory), error='"{}" is neither a file nor a directory'), help='either a file or a directory')
    p.add_argument('state_file', type=argparse.FileType('a+'), help='e.g. /var/lib/feed-push/state')
    p.add_argument('--flood', default=False, action='store_true', help='push all articles on startup instead of ignoring the ones older than 24h (useful for debugging)')
    p.add_argument('--fork', metavar='pid-file', nargs=1, type=File(os.O_WRONLY|os.O_CREAT|os.O_EXCL), action=First, help='useful in init scripts')
    p.add_argument('--log-level', nargs=1, default=1, choices=log_levels, action=partial(Apply, log_levels.index), help='default is INFO')
    p.add_argument('--syslog', default=False, action='store_true', help='log to syslog instead of stderr')
    global_args = p.parse_args()

    if global_args.fork:
        pid = os.fork()
        if pid != 0:
            global_args.fork.write(str(pid))
            exit(0)

    if global_args.syslog:
        openlog(facility=LOG_DAEMON)

    state = {'id_cache': {}}
    saved_state = global_args.state_file.read().strip()
    if len(saved_state) > 0:
        state.update(json.loads(saved_state))
    del saved_state

    monitor = gamin.WatchMonitor()
    path_to_feed_fd = {}
    path_to_config_fd = {}
    config_to_feed_paths_to_commands = {}
    if hasattr(global_args.config, 'read'):
        os.chdir(os.path.dirname(global_args.config.name))
        monitor.watch_file(global_args.config.name, handle_config_change)
    else:
        os.chdir(global_args.config)
        monitor.watch_directory(global_args.config, handle_config_change)

    while True:
        monitor.handle_one_event()