view feed-push @ 34:030e8b24f8b7 draft default tip master

pretty print JSON in state file Signed-off-by: Changaco <changaco ατ changaco δοτ net>
author Changaco <changaco ατ changaco δοτ net>
date Sun, 05 Aug 2012 15:18:28 +0200
parents adde3e451237
children
line wrap: on
line source

#!/usr/bin/env python2
# -*- coding: utf-8 -*-

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import argparse
import calendar
from   functools  import partial, reduce
from   glob       import glob
import json
import os
from   os.path    import abspath, dirname, isdir
import shlex
from   subprocess import Popen, PIPE, STDOUT
import sys
from   syslog     import *
import time
import traceback

import feedparser


# Constants

log_levels = ['DEBUG', 'INFO', 'NOTICE', 'WARNING', 'ERR', 'CRIT', 'ALERT', 'EMERG']

gamin_events = {
    1:'GAMChanged', 2:'GAMDeleted', 3:'GAMStartExecuting', 4:'GAMStopExecuting',
    5:'GAMCreated', 6:'GAMMoved', 7:'GAMAcknowledge', 8:'GAMExists', 9:'GAMEndExist'
}


# Generic utils

concat = lambda l: reduce(list.__add__, l, [])

def dict_append(d, k, v):
    d.setdefault(k, [])
    d[k].append(v)


# Logging

def log(*args):
    if len(args) == 1:
        priority, msg = LOG_INFO, args[0]
    elif len(args) == 2:
        priority, msg = args
    else:
        return
    priority = 7 - priority
    if priority < global_args.log_level:
        return
    if isinstance(msg, unicode):
        msg = msg.encode('utf8')
    try:
        if global_args.fork:
            syslog(priority, msg)
        else:
            sys.stderr.write(log_levels[priority]+': '+msg+'\n')
    except:
        exit(1)

def ignore_event(path, event):
    log(LOG_DEBUG, 'ignoring event '+gamin_events.get(event, str(event))+' on '+path)


# Config parsing

def parse_config_file(config_path):
    try:
        config_fd = open(config_path)
    except IOError as e:
        return log('failed to open "'+line+'": '+str(e))
    feeds_paths = config_to_feed_paths_to_commands[config_path] = {}
    cmd = []
    log('parsing config file '+config_path)
    config_fd.seek(0)
    for i, line in enumerate(config_fd):
        line = line.strip()
        if len(line) == 0 or line[0] == '#':
            continue
        if line[0] == '%':
            if isinstance(cmd, str):
                cmd = []
            cmd.append(line[1:].rstrip(';'))
        elif not cmd:
            log(LOG_ERR, 'missing command in file '+config_path+' before line '+str(i))
            return
        else:
            cmd = '; '.join(cmd)
            for feed_path in glob(line):
                feed_path = abspath(feed_path)
                dict_append(feeds_paths, feed_path, cmd)
                if not feed_path in watched_feeds:
                    monitor.watch_file(feed_path, handler(handle_feed_change))
                    watched_feeds.add(feed_path)
                    log('now watching '+feed_path)
    config_fd.close()


# Gamin callbacks

def handler(f):
    def g(path, event):
        try:
            f(path, event)
        except:
            log(LOG_CRIT, traceback.format_exc())
    return g

def handle_config_change(path, event):
    path = abspath(path)
    if os.path.isdir(path):
        ignore_event(path, event)
    elif not path in config_to_feed_paths_to_commands:
        open_config(path, event)
    elif event in [gamin.GAMChanged, gamin.GAMDeleted]:
        update_config(path, event)
    else:
        ignore_event(path, event)

def open_config(path, event):
    if event in [gamin.GAMCreated, gamin.GAMExists]:
        if (not path.endswith('.conf') or path[0] == '.') and not hasattr(global_args.config, 'read'):
            return log('ignoring '+path+' (not a valid config file name)')
        parse_config_file(path)
    else:
        ignore_event(path, event)

def update_config(path, event):
    feeds_paths = set(concat(d.keys() for d in config_to_feed_paths_to_commands.values()))
    if event == gamin.GAMChanged:
        parse_config_file(path)
    elif event == gamin.GAMDeleted:
        log('removing actions from deleted config file '+path)
        config_to_feed_paths_to_commands.pop(path)
    new_feeds_paths = set(concat(d.keys() for d in config_to_feed_paths_to_commands.values()))
    for feed_path in feeds_paths.difference(new_feeds_paths):
        monitor.stop_watch(feed_path)
        watched_feeds.discard(feed_path)
        log('stopped watching '+feed_path)

def handle_feed_change(path, event):
    if event in [gamin.GAMCreated, gamin.GAMExists, gamin.GAMChanged]:
        try:
            feed_fd = open(path)
        except IOError as e:
            return log('failed to open "'+path+'": '+str(e))
        feed = feedparser.parse(feed_fd.read())
        feed_fd.close()
        id_cache = state['id_cache']
        feed_id = feed.feed.get('link', path)
        if feed_id != path and path in id_cache:
            id_cache[feed_id] = id_cache.pop(path)
        i = 0
        for entry in reversed(feed.entries):
            if entry.id in id_cache.get(feed_id, []) or \
               not global_args.flood and calendar.timegm(entry.published_parsed) < time.time() - 86400:
                continue
            i += 1
            for feed_path_to_commands in config_to_feed_paths_to_commands.values():
                for cmd in feed_path_to_commands.get(path, []):
                    run_command(format_cmd(cmd, feed=feed.feed, entry=entry), entry.content[0].value)
        id_cache[feed_id] = [entry.id for entry in feed.entries]
        save_state()
        if i == 0:
            log('no new entry in %s' % path)
    else:
        ignore_event(path, event)

def save_state():
    global_args.state_file.truncate(0)
    if global_args.pretty:
        json.dump(state, global_args.state_file, indent=4)
    else:
        json.dump(state, global_args.state_file)
    global_args.state_file.flush()


# Commands utils

def format_cmd(cmd, **kwargs):
    """The safe equivalent of str.format() for shell commands, meaning interpolated variables can't do shell injections (I hope)."""
    r = u''
    for arg in shlex.split(cmd.encode('utf8')):
        a = arg.decode('utf8')
        b = a.format(**kwargs)
        if a != b:
            r += u" '" + b.replace(u"'", u'\'"\'"\'') + u"'"
        else:
            r += u' ' + arg
    return r.lstrip()

def run_command(cmd, input):
    p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, shell=True)
    output = p.communicate(input.encode('utf8'))[0].decode('utf8')
    if p.returncode != 0:
        log(LOG_ERR, 'command failed: '+cmd+'\n'+output)
    else:
        log(LOG_INFO, 'successfully executed '+cmd)
        log(LOG_DEBUG, '===== output:\n'+output)


# Argparse utils

def AbsPath(next_type=None):
    def f(s):
        p = abspath(s)
        if next_type is not None:
            return next_type(p)
        else:
            return p
    return f

class Apply(argparse.Action):
    def __init__(self, f, *args, **kwargs):
        super(self.__class__, self).__init__(**kwargs)
        self.f = f
    def __call__(self, parser, namespace, values, option_string=None):
        setattr(namespace, self.dest, self.f(values[0]))

def MakeDirs(next_type=None):
    def f(s):
        d = dirname(s)
        if d and not isdir(d):
            try:
                os.makedirs(d)
            except OSError as e:
                raise argparse.ArgumentTypeError(str(e))
        if next_type is not None:
            return next_type(s)
        else:
            return s
    return f

def Directory(s):
    try:
        os.listdir(s)
        return s
    except OSError as e:
        raise argparse.ArgumentTypeError(str(e))

def File(flags):
    def f(s):
        try:
            return os.fdopen(os.open(s, flags), 'w')
        except OSError as e:
            raise argparse.ArgumentTypeError(str(e))
    return f

class First(argparse.Action):
    def __call__(self, parser, namespace, values, option_string=None):
        setattr(namespace, self.dest, values[0])

def FirstOf(*types, **kwargs):
    kwargs.setdefault('error', 'argument "{}" is not valid')
    def f(s):
        for t in types:
            try:
                return t(s)
            except:
                pass
        raise argparse.ArgumentTypeError(error.format(s))
    return f


# Main

if __name__ == '__main__':

    p = argparse.ArgumentParser()
    p.add_argument('config', type=FirstOf(AbsPath(argparse.FileType('r')), AbsPath(Directory), error='"{}" is neither a file nor a directory'), help='either a file or a directory')
    p.add_argument('state_file', type=MakeDirs(argparse.FileType('a+')), help='e.g. /var/lib/feed-push/state')
    p.add_argument('--flood', default=False, action='store_true', help='push all articles on startup instead of ignoring the ones older than 24h (useful for debugging)')
    p.add_argument('--fork', metavar='pid-file', nargs=1, type=File(os.O_WRONLY|os.O_CREAT|os.O_EXCL), action=First, help='daemonize and log to syslog')
    p.add_argument('--log-level', nargs=1, default=1, choices=log_levels, action=partial(Apply, log_levels.index), help='default is INFO')
    p.add_argument('--no-pretty', dest='pretty', default=True, action='store_false', help='don\'t pretty print JSON in state file')
    global_args = p.parse_args()

    if global_args.fork:
        pid = os.fork()
        if pid != 0:
            exit(0)
        os.setsid()
        pid = os.fork()
        if pid != 0:
            global_args.fork.write(str(pid))
            exit(0)
        openlog(facility=LOG_DAEMON)
        null = open('/dev/null', 'r+')
        for f in [sys.stdin, sys.stdout, sys.stderr]:
            f.flush()
            os.dup2(null.fileno(), f.fileno())

    state = {'id_cache': {}}
    saved_state = global_args.state_file.read().strip()
    if len(saved_state) > 0:
        state.update(json.loads(saved_state))
    del saved_state

    import gamin
    monitor = gamin.WatchMonitor()
    watched_feeds = set()
    config_to_feed_paths_to_commands = {}
    if hasattr(global_args.config, 'read'):
        os.chdir(os.path.dirname(global_args.config.name))
        monitor.watch_file(global_args.config.name, handler(handle_config_change))
    else:
        os.chdir(global_args.config)
        monitor.watch_directory(global_args.config, handler(handle_config_change))

    if global_args.fork:
        while True:
            monitor.handle_one_event()
    else:
        try:
            while True:
                monitor.handle_events()
                time.sleep(1)
        except KeyboardInterrupt:
            exit(0)