Mercurial > feed-push
view feed-push @ 34:030e8b24f8b7 draft default tip master
pretty print JSON in state file
Signed-off-by: Changaco <changaco ατ changaco δοτ net>
author | Changaco <changaco ατ changaco δοτ net> |
---|---|
date | Sun, 05 Aug 2012 15:18:28 +0200 |
parents | adde3e451237 |
children |
line wrap: on
line source
#!/usr/bin/env python2 # -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import argparse import calendar from functools import partial, reduce from glob import glob import json import os from os.path import abspath, dirname, isdir import shlex from subprocess import Popen, PIPE, STDOUT import sys from syslog import * import time import traceback import feedparser # Constants log_levels = ['DEBUG', 'INFO', 'NOTICE', 'WARNING', 'ERR', 'CRIT', 'ALERT', 'EMERG'] gamin_events = { 1:'GAMChanged', 2:'GAMDeleted', 3:'GAMStartExecuting', 4:'GAMStopExecuting', 5:'GAMCreated', 6:'GAMMoved', 7:'GAMAcknowledge', 8:'GAMExists', 9:'GAMEndExist' } # Generic utils concat = lambda l: reduce(list.__add__, l, []) def dict_append(d, k, v): d.setdefault(k, []) d[k].append(v) # Logging def log(*args): if len(args) == 1: priority, msg = LOG_INFO, args[0] elif len(args) == 2: priority, msg = args else: return priority = 7 - priority if priority < global_args.log_level: return if isinstance(msg, unicode): msg = msg.encode('utf8') try: if global_args.fork: syslog(priority, msg) else: sys.stderr.write(log_levels[priority]+': '+msg+'\n') except: exit(1) def ignore_event(path, event): log(LOG_DEBUG, 'ignoring event '+gamin_events.get(event, str(event))+' on '+path) # Config parsing def parse_config_file(config_path): try: config_fd = open(config_path) except IOError as e: return log('failed to open "'+line+'": '+str(e)) feeds_paths = config_to_feed_paths_to_commands[config_path] = {} cmd = [] log('parsing config file '+config_path) config_fd.seek(0) for i, line in enumerate(config_fd): line = line.strip() if len(line) == 0 or line[0] == '#': continue if line[0] == '%': if isinstance(cmd, str): cmd = [] cmd.append(line[1:].rstrip(';')) elif not cmd: log(LOG_ERR, 'missing command in file '+config_path+' before line '+str(i)) return else: cmd = '; '.join(cmd) for feed_path in glob(line): feed_path = abspath(feed_path) dict_append(feeds_paths, feed_path, cmd) if not feed_path in watched_feeds: monitor.watch_file(feed_path, handler(handle_feed_change)) watched_feeds.add(feed_path) log('now watching '+feed_path) config_fd.close() # Gamin callbacks def handler(f): def g(path, event): try: f(path, event) except: log(LOG_CRIT, traceback.format_exc()) return g def handle_config_change(path, event): path = abspath(path) if os.path.isdir(path): ignore_event(path, event) elif not path in config_to_feed_paths_to_commands: open_config(path, event) elif event in [gamin.GAMChanged, gamin.GAMDeleted]: update_config(path, event) else: ignore_event(path, event) def open_config(path, event): if event in [gamin.GAMCreated, gamin.GAMExists]: if (not path.endswith('.conf') or path[0] == '.') and not hasattr(global_args.config, 'read'): return log('ignoring '+path+' (not a valid config file name)') parse_config_file(path) else: ignore_event(path, event) def update_config(path, event): feeds_paths = set(concat(d.keys() for d in config_to_feed_paths_to_commands.values())) if event == gamin.GAMChanged: parse_config_file(path) elif event == gamin.GAMDeleted: log('removing actions from deleted config file '+path) config_to_feed_paths_to_commands.pop(path) new_feeds_paths = set(concat(d.keys() for d in config_to_feed_paths_to_commands.values())) for feed_path in feeds_paths.difference(new_feeds_paths): monitor.stop_watch(feed_path) watched_feeds.discard(feed_path) log('stopped watching '+feed_path) def handle_feed_change(path, event): if event in [gamin.GAMCreated, gamin.GAMExists, gamin.GAMChanged]: try: feed_fd = open(path) except IOError as e: return log('failed to open "'+path+'": '+str(e)) feed = feedparser.parse(feed_fd.read()) feed_fd.close() id_cache = state['id_cache'] feed_id = feed.feed.get('link', path) if feed_id != path and path in id_cache: id_cache[feed_id] = id_cache.pop(path) i = 0 for entry in reversed(feed.entries): if entry.id in id_cache.get(feed_id, []) or \ not global_args.flood and calendar.timegm(entry.published_parsed) < time.time() - 86400: continue i += 1 for feed_path_to_commands in config_to_feed_paths_to_commands.values(): for cmd in feed_path_to_commands.get(path, []): run_command(format_cmd(cmd, feed=feed.feed, entry=entry), entry.content[0].value) id_cache[feed_id] = [entry.id for entry in feed.entries] save_state() if i == 0: log('no new entry in %s' % path) else: ignore_event(path, event) def save_state(): global_args.state_file.truncate(0) if global_args.pretty: json.dump(state, global_args.state_file, indent=4) else: json.dump(state, global_args.state_file) global_args.state_file.flush() # Commands utils def format_cmd(cmd, **kwargs): """The safe equivalent of str.format() for shell commands, meaning interpolated variables can't do shell injections (I hope).""" r = u'' for arg in shlex.split(cmd.encode('utf8')): a = arg.decode('utf8') b = a.format(**kwargs) if a != b: r += u" '" + b.replace(u"'", u'\'"\'"\'') + u"'" else: r += u' ' + arg return r.lstrip() def run_command(cmd, input): p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, shell=True) output = p.communicate(input.encode('utf8'))[0].decode('utf8') if p.returncode != 0: log(LOG_ERR, 'command failed: '+cmd+'\n'+output) else: log(LOG_INFO, 'successfully executed '+cmd) log(LOG_DEBUG, '===== output:\n'+output) # Argparse utils def AbsPath(next_type=None): def f(s): p = abspath(s) if next_type is not None: return next_type(p) else: return p return f class Apply(argparse.Action): def __init__(self, f, *args, **kwargs): super(self.__class__, self).__init__(**kwargs) self.f = f def __call__(self, parser, namespace, values, option_string=None): setattr(namespace, self.dest, self.f(values[0])) def MakeDirs(next_type=None): def f(s): d = dirname(s) if d and not isdir(d): try: os.makedirs(d) except OSError as e: raise argparse.ArgumentTypeError(str(e)) if next_type is not None: return next_type(s) else: return s return f def Directory(s): try: os.listdir(s) return s except OSError as e: raise argparse.ArgumentTypeError(str(e)) def File(flags): def f(s): try: return os.fdopen(os.open(s, flags), 'w') except OSError as e: raise argparse.ArgumentTypeError(str(e)) return f class First(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): setattr(namespace, self.dest, values[0]) def FirstOf(*types, **kwargs): kwargs.setdefault('error', 'argument "{}" is not valid') def f(s): for t in types: try: return t(s) except: pass raise argparse.ArgumentTypeError(error.format(s)) return f # Main if __name__ == '__main__': p = argparse.ArgumentParser() p.add_argument('config', type=FirstOf(AbsPath(argparse.FileType('r')), AbsPath(Directory), error='"{}" is neither a file nor a directory'), help='either a file or a directory') p.add_argument('state_file', type=MakeDirs(argparse.FileType('a+')), help='e.g. /var/lib/feed-push/state') p.add_argument('--flood', default=False, action='store_true', help='push all articles on startup instead of ignoring the ones older than 24h (useful for debugging)') p.add_argument('--fork', metavar='pid-file', nargs=1, type=File(os.O_WRONLY|os.O_CREAT|os.O_EXCL), action=First, help='daemonize and log to syslog') p.add_argument('--log-level', nargs=1, default=1, choices=log_levels, action=partial(Apply, log_levels.index), help='default is INFO') p.add_argument('--no-pretty', dest='pretty', default=True, action='store_false', help='don\'t pretty print JSON in state file') global_args = p.parse_args() if global_args.fork: pid = os.fork() if pid != 0: exit(0) os.setsid() pid = os.fork() if pid != 0: global_args.fork.write(str(pid)) exit(0) openlog(facility=LOG_DAEMON) null = open('/dev/null', 'r+') for f in [sys.stdin, sys.stdout, sys.stderr]: f.flush() os.dup2(null.fileno(), f.fileno()) state = {'id_cache': {}} saved_state = global_args.state_file.read().strip() if len(saved_state) > 0: state.update(json.loads(saved_state)) del saved_state import gamin monitor = gamin.WatchMonitor() watched_feeds = set() config_to_feed_paths_to_commands = {} if hasattr(global_args.config, 'read'): os.chdir(os.path.dirname(global_args.config.name)) monitor.watch_file(global_args.config.name, handler(handle_config_change)) else: os.chdir(global_args.config) monitor.watch_directory(global_args.config, handler(handle_config_change)) if global_args.fork: while True: monitor.handle_one_event() else: try: while True: monitor.handle_events() time.sleep(1) except KeyboardInterrupt: exit(0)