Mercurial > feed-push
view feed-push @ 23:5bb7c2939da0 draft
add some debug logging
Signed-off-by: Changaco <changaco ατ changaco δοτ net>
author | Changaco <changaco ατ changaco δοτ net> |
---|---|
date | Sat, 04 Aug 2012 16:57:38 +0200 |
parents | b763ca084088 |
children | 96e2d5ffcd23 |
line wrap: on
line source
#!/usr/bin/env python2 # -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import argparse import calendar from functools import partial, reduce from glob import glob import json import os from os.path import abspath, dirname, isdir import shlex from subprocess import Popen, PIPE, STDOUT import sys from syslog import * import time import traceback import feedparser # Constants log_levels = ['DEBUG', 'INFO', 'NOTICE', 'WARNING', 'ERR', 'CRIT', 'ALERT', 'EMERG'] gamin_events = { 1:'GAMChanged', 2:'GAMDeleted', 3:'GAMStartExecuting', 4:'GAMStopExecuting', 5:'GAMCreated', 6:'GAMMoved', 7:'GAMAcknowledge', 8:'GAMExists', 9:'GAMEndExist' } # Generic utils concat = lambda l: reduce(list.__add__, l, []) def dict_append(d, k, v): d.setdefault(k, []) d[k].append(v) # Logging def log(*args): if len(args) == 1: priority, msg = LOG_INFO, args[0] elif len(args) == 2: priority, msg = args else: return priority = 7 - priority if priority < global_args.log_level: return if isinstance(msg, unicode): msg = msg.encode('utf8') try: if global_args.fork: syslog(priority, msg) else: sys.stderr.write(log_levels[priority]+': '+msg+'\n') except: exit(1) def ignore_event(path, event): log(LOG_DEBUG, 'ignoring event '+gamin_events.get(event, str(event))+' on '+path) # Config parsing def parse_config_file(config_fd): feeds_paths = config_to_feed_paths_to_commands[config_fd.name] = {} cmd = [] log('parsing config file '+config_fd.name) config_fd.seek(0) for i, line in enumerate(config_fd): line = line.strip() if len(line) == 0 or line[0] == '#': continue if line[0] == '%': if isinstance(cmd, str): cmd = [] cmd.append(line[1:].rstrip(';')) elif not cmd: log(LOG_ERR, 'missing command in file '+config_fd.name+' before line '+str(i)) return else: cmd = '; '.join(cmd) for feed_path in glob(line): feed_path = abspath(feed_path) dict_append(feeds_paths, feed_path, cmd) if not feed_path in path_to_feed_fd: monitor.watch_file(feed_path, handler(handle_feed_change)) log('now watching '+feed_path) # Gamin callbacks def handler(f): def g(path, event): try: f(path, event) except: log(LOG_CRIT, traceback.format_exc()) return g def handle_config_change(path, event): path = abspath(path) if os.path.isdir(path): ignore_event(path, event) elif not path in path_to_config_fd: open_config(path, event) elif event in [gamin.GAMChanged, gamin.GAMDeleted]: update_config(path, event) else: ignore_event(path, event) def open_config(path, event): if event in [gamin.GAMCreated, gamin.GAMExists]: if (not path.endswith('.conf') or path[0] == '.') and not hasattr(global_args.config, 'read'): return log('ignoring '+path+' (not a valid config file name)') try: config_fd = open(path) except IOError as e: return log('failed to open "'+line+'" '+str(e)) path_to_config_fd[path] = config_fd parse_config_file(config_fd) else: ignore_event(path, event) def update_config(path, event): feeds_paths = set(concat(d.keys() for d in config_to_feed_paths_to_commands.values())) if event == gamin.GAMChanged: parse_config_file(path_to_config_fd[path]) elif event == gamin.GAMDeleted: log('removing actions from deleted config file '+config_fd.name) config_to_feed_paths_to_commands.pop(path) path_to_config_fd.pop(path).close() new_feeds_paths = set(concat(d.keys() for d in config_to_feed_paths_to_commands.values())) for feed_path in feeds_paths.difference(new_feeds_paths): monitor.stop_watch(feed_path) log('stopped watching '+feed_path) if feed_path in path_to_feed_fd: path_to_feed_fd.pop(feed_path).close() def handle_feed_change(path, event): if path not in path_to_feed_fd: if event in [gamin.GAMCreated, gamin.GAMExists, gamin.GAMChanged]: try: feed_fd = path_to_feed_fd[path] = open(path) except IOError as e: return log('failed to open "'+path+'": '+str(e)) feed = feedparser.parse(feed_fd.read()) handle_feed_change(path, gamin.GAMChanged) else: ignore_event(path, event) elif event == gamin.GAMChanged: feed_fd = path_to_feed_fd[path] feed_fd.seek(0) feed = feedparser.parse(feed_fd.read()) i = 0 for entry in reversed(feed.entries): if entry.id in state['id_cache'].get(feed_fd.name, []) or \ not global_args.flood and calendar.timegm(entry.published_parsed) < time.time() - 86400: continue i += 1 for feed_path_to_commands in config_to_feed_paths_to_commands.values(): for cmd in feed_path_to_commands.get(path, []): run_command(format_cmd(cmd, feed=feed.feed, entry=entry), entry.content[0].value) state['id_cache'][feed_fd.name] = [entry.id for entry in feed.entries] save_state() if i == 0: log(LOG_DEBUG, 'received GAMChanged event on "%s" but no new entry was found' % path) elif event == gamin.GAMDeleted: path_to_feed_fd.pop(path).close() else: ignore_event(path, event) def save_state(): global_args.state_file.truncate(0) json.dump(state, global_args.state_file) global_args.state_file.flush() # Commands utils def format_cmd(cmd, **kwargs): """The safe equivalent of str.format() for shell commands, meaning interpolated variables can't do shell injections (I hope).""" r = u'' for arg in shlex.split(cmd.encode('utf8')): a = arg.decode('utf8') b = a.format(**kwargs) if a != b: r += u" '" + b.replace(u"'", u'\'"\'"\'') + u"'" else: r += u' ' + arg return r.lstrip() def run_command(cmd, input): p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, shell=True) output = p.communicate(input.encode('utf8'))[0].decode('utf8') if p.returncode != 0: log(LOG_ERR, 'command failed: '+cmd+'\n'+output) else: log(LOG_INFO, 'successfully executed '+cmd) log(LOG_DEBUG, '===== output:\n'+output) # Argparse utils def AbsPath(next_type=None): def f(s): p = abspath(s) if next_type is not None: return next_type(p) else: return p return f class Apply(argparse.Action): def __init__(self, f, *args, **kwargs): super(self.__class__, self).__init__(**kwargs) self.f = f def __call__(self, parser, namespace, values, option_string=None): setattr(namespace, self.dest, self.f(values[0])) def MakeDirs(next_type=None): def f(s): d = dirname(s) if d and not isdir(d): try: os.makedirs(d) except OSError as e: raise argparse.ArgumentTypeError(str(e)) if next_type is not None: return next_type(s) else: return s return f def Directory(s): try: os.listdir(s) return s except OSError as e: raise argparse.ArgumentTypeError(str(e)) def File(flags): def f(s): try: return os.fdopen(os.open(s, flags), 'w') except OSError as e: raise argparse.ArgumentTypeError(str(e)) return f class First(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): setattr(namespace, self.dest, values[0]) def FirstOf(*types, **kwargs): kwargs.setdefault('error', 'argument "{}" is not valid') def f(s): for t in types: try: return t(s) except: pass raise argparse.ArgumentTypeError(error.format(s)) return f # Main if __name__ == '__main__': p = argparse.ArgumentParser() p.add_argument('config', type=FirstOf(AbsPath(argparse.FileType('r')), AbsPath(Directory), error='"{}" is neither a file nor a directory'), help='either a file or a directory') p.add_argument('state_file', type=MakeDirs(argparse.FileType('a+')), help='e.g. /var/lib/feed-push/state') p.add_argument('--flood', default=False, action='store_true', help='push all articles on startup instead of ignoring the ones older than 24h (useful for debugging)') p.add_argument('--fork', metavar='pid-file', nargs=1, type=File(os.O_WRONLY|os.O_CREAT|os.O_EXCL), action=First, help='daemonize and log to syslog') p.add_argument('--log-level', nargs=1, default=1, choices=log_levels, action=partial(Apply, log_levels.index), help='default is INFO') global_args = p.parse_args() if global_args.fork: pid = os.fork() if pid != 0: exit(0) os.setsid() pid = os.fork() if pid != 0: global_args.fork.write(str(pid)) exit(0) openlog(facility=LOG_DAEMON) null = open('/dev/null', 'r+') for f in [sys.stdin, sys.stdout, sys.stderr]: f.flush() os.dup2(null.fileno(), f.fileno()) state = {'id_cache': {}} saved_state = global_args.state_file.read().strip() if len(saved_state) > 0: state.update(json.loads(saved_state)) del saved_state import gamin monitor = gamin.WatchMonitor() path_to_feed_fd = {} path_to_config_fd = {} config_to_feed_paths_to_commands = {} if hasattr(global_args.config, 'read'): os.chdir(os.path.dirname(global_args.config.name)) monitor.watch_file(global_args.config.name, handler(handle_config_change)) else: os.chdir(global_args.config) monitor.watch_directory(global_args.config, handler(handle_config_change)) if global_args.fork: while True: monitor.handle_one_event() else: try: while True: monitor.handle_events() time.sleep(1) except KeyboardInterrupt: exit(0)