Mercurial > feed-push
view feed-push @ 2:292896092ee6 draft
chmod +x rc.d/feed-push
Signed-off-by: Changaco <changaco ατ changaco δοτ net>
author | Changaco <changaco ατ changaco δοτ net> |
---|---|
date | Mon, 16 Apr 2012 00:09:37 +0200 |
parents | a68d7feeba88 |
children | 455cd8c78862 |
line wrap: on
line source
#!/usr/bin/env python2 # -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import argparse import calendar from functools import partial, reduce from glob import glob import json import os from os.path import abspath import shlex from subprocess import Popen, PIPE, STDOUT import sys from syslog import * import time import feedparser import gamin # Constants log_levels = ['DEBUG', 'INFO', 'NOTICE', 'WARNING', 'ERR', 'CRIT', 'ALERT', 'EMERG'] gamin_events = { 1:'GAMChanged', 2:'GAMDeleted', 3:'GAMStartExecuting', 4:'GAMStopExecuting', 5:'GAMCreated', 6:'GAMMoved', 7:'GAMAcknowledge', 8:'GAMExists', 9:'GAMEndExist' } # Generic utils concat = lambda l: reduce(list.__add__, l, []) def dict_append(d, k, v): d.setdefault(k, []) d[k].append(v) # Logging def log(*args): if len(args) == 1: priority, msg = LOG_INFO, args[0] elif len(args) == 2: priority, msg = args else: return priority = 7 - priority if priority < global_args.log_level: return if global_args.syslog: syslog(*args) else: sys.stderr.write(log_levels[priority]+': '+msg+'\n') def ignore_event(path, event): log(LOG_DEBUG, 'ignoring event '+gamin_events.get(event, str(event))+' on '+path) # Config parsing def parse_config_file(config_fd): feeds_paths = config_to_feed_paths_to_commands[config_fd.name] = {} cmd = None next_cmd = [] config_fd.seek(0) for i, line in enumerate(config_fd): line = line.strip() if len(line) == 0 or line[0] == '#': continue if line[0] == '%': next_cmd.append(line[1:].rstrip(';')) elif not next_cmd: log(LOG_ERR, 'missing command in file '+config_fd.name+' before line '+str(i)) return else: cmd = '; '.join(next_cmd) next_cmd = [] for feed_path in glob(line): feed_path = abspath(feed_path) dict_append(feeds_paths, feed_path, cmd) if not feed_path in path_to_feed_fd: monitor.watch_file(feed_path, handle_feed_change) log('successfully parsed config file '+config_fd.name) # Gamin callbacks def handle_config_change(path, event): path = abspath(path) if os.path.isdir(path): ignore_event(path, event) elif not path in path_to_config_fd: open_config(path, event) elif event in [gamin.GAMChanged, gamin.GAMDeleted]: update_config(path, event) else: ignore_event(path, event) def open_config(path, event): if event in [gamin.GAMCreated, gamin.GAMExists]: if (not path.endswith('.conf') or path[0] == '.') and not hasattr(global_args.config, 'read'): return log('ignoring '+path+' (not a valid config file name)') try: config_fd = open(path) except IOError as e: return log('failed to open "'+line+'" '+str(e)) path_to_config_fd[path] = config_fd parse_config_file(config_fd) else: ignore_event(path, event) def update_config(path, event): feeds_paths = set(concat(config_to_feed_paths_to_commands.values())) if event == gamin.GAMChanged: log('updating actions from modified config file '+config_fd.name) parse_config_file(path_to_config_fd[path]) elif event == gamin.GAMDeleted: log('removing actions from deleted config file '+config_fd.name) config_to_feed_paths_to_commands.pop(path) path_to_config_fd.pop(path).close() new_feeds_paths = set(concat(config_to_feed_paths_to_commands.values())) for feed_path in feeds_paths.difference(new_feeds_paths): monitor.stop_watch(feed_path) if feed_path in path_to_feed_fd: path_to_feed_fd.pop(feed_path).close() def handle_feed_change(path, event): if path not in path_to_feed_fd: if event in [gamin.GAMCreated, gamin.GAMExists, gamin.GAMChanged]: try: feed_fd = path_to_feed_fd[path] = open(path) except IOError as e: return log('failed to open "'+path+'": '+str(e)) feed = feedparser.parse(feed_fd.read()) handle_feed_change(path, gamin.GAMChanged) else: ignore_event(path, event) elif event == gamin.GAMChanged: feed_fd = path_to_feed_fd[path] feed_fd.seek(0) feed = feedparser.parse(feed_fd.read()) for entry in reversed(feed.entries): if entry.id in state['id_cache'].get(feed_fd.name, []) or \ not global_args.flood and calendar.timegm(entry.published_parsed) < time.time() - 86400: continue for feed_path_to_commands in config_to_feed_paths_to_commands.values(): for cmd in feed_path_to_commands.get(path, []): run_command(format_cmd(cmd, feed=feed.feed, entry=entry), entry.content[0].value) state['id_cache'][feed_fd.name] = [entry.id for entry in feed.entries] save_state() elif event == gamin.GAMDeleted: path_to_feed_fd.pop(path).close() else: ignore_event(path, event) def save_state(): global_args.state_file.truncate(0) json.dump(state, global_args.state_file) global_args.state_file.flush() # Commands utils def format_cmd(cmd, **kwargs): """The safe equivalent of str.format() for shell commands, meaning interpolated variables can't do shell injections (I hope).""" r = u'' for arg in shlex.split(cmd.encode('utf8')): a = arg.decode('utf8') b = a.format(**kwargs) if a != b: r += u" '" + b.replace(u"'", u'\'"\'"\'') + u"'" else: r += u' ' + arg return r.lstrip() def run_command(cmd, input): p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, shell=True) output = p.communicate(input.encode('utf8'))[0].decode('utf8') if p.returncode != 0: log(LOG_ERR, 'command failed: '+cmd+'\n'+output) else: log(LOG_INFO, 'successfully executed '+cmd) log(LOG_DEBUG, '===== output:\n'+output) # Argparse utils def AbsPath(next_type=None): def f(s): p = abspath(s) if next_type is not None: return next_type(p) else: return p return f class Apply(argparse.Action): def __init__(self, f, *args, **kwargs): super(self.__class__, self).__init__(**kwargs) self.f = f def __call__(self, parser, namespace, values, option_string=None): setattr(namespace, self.dest, self.f(values[0])) def Directory(s): try: os.listdir(s) return s except OSError as e: raise argparse.ArgumentTypeError(str(e)) def File(flags): def f(s): try: return os.fdopen(os.open(s, flags), 'w') except OSError as e: raise argparse.ArgumentTypeError(str(e)) return f class First(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): setattr(namespace, self.dest, values[0]) def FirstOf(*types, **kwargs): kwargs.setdefault('error', 'argument "{}" is not valid') def f(s): for t in types: try: return t(s) except: pass raise argparse.ArgumentTypeError(error.format(s)) return f # Main if __name__ == '__main__': p = argparse.ArgumentParser() p.add_argument('config', type=FirstOf(AbsPath(argparse.FileType('r')), AbsPath(Directory), error='"{}" is neither a file nor a directory'), help='either a file or a directory') p.add_argument('state_file', type=argparse.FileType('a+'), help='e.g. /var/lib/feed-push/state') p.add_argument('--flood', default=False, action='store_true', help='push all articles on startup instead of ignoring the ones older than 24h (useful for debugging)') p.add_argument('--fork', metavar='pid-file', nargs=1, type=File(os.O_WRONLY|os.O_CREAT|os.O_EXCL), action=First, help='useful in init scripts') p.add_argument('--log-level', nargs=1, default=1, choices=log_levels, action=partial(Apply, log_levels.index), help='default is INFO') p.add_argument('--syslog', default=False, action='store_true', help='log to syslog instead of stderr') global_args = p.parse_args() if global_args.fork: pid = os.fork() if pid != 0: global_args.fork.write(str(pid)) exit(0) if global_args.syslog: openlog(facility=LOG_DAEMON) state = {'id_cache': {}} saved_state = global_args.state_file.read().strip() if len(saved_state) > 0: state.update(json.loads(saved_state)) del saved_state monitor = gamin.WatchMonitor() path_to_feed_fd = {} path_to_config_fd = {} config_to_feed_paths_to_commands = {} if hasattr(global_args.config, 'read'): os.chdir(os.path.dirname(global_args.config.name)) monitor.watch_file(global_args.config.name, handle_config_change) else: os.chdir(global_args.config) monitor.watch_directory(global_args.config, handle_config_change) while True: monitor.handle_one_event()