From 97d7c9014855449fe04162308feac66a35e007ea Mon Sep 17 00:00:00 2001 From: Jon Bergli Heier Date: Mon, 8 Aug 2011 00:11:01 +0200 Subject: Initial commit. app.py - WSGI application and handlers. config.py - Config helper class. directory.py - Directory and file helper classes. events.py - zeromq event publisher and subscriber. recode.py - Codecs and stuff static/ - Web interface --- events.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 events.py (limited to 'events.py') diff --git a/events.py b/events.py new file mode 100644 index 0000000..4759a19 --- /dev/null +++ b/events.py @@ -0,0 +1,39 @@ +import zmq, json +from config import config + +def EventSubscriber(app, environ, start_response, path): + context = zmq.Context() + socket = context.socket(zmq.SUB) + socket.connect(config.get('event_subscriber')) + socket.setsockopt(zmq.SUBSCRIBE, 'cached') + socket.setsockopt(zmq.SUBSCRIBE, 'recoding') + + start_response('200 OK', [('Content-Type', 'text/event-stream')]) + yield ': event source stream\n\n' + poller = zmq.Poller() + poller.register(socket, zmq.POLLIN) + while True: + socks = dict(poller.poll(config.getint('event_timeout') * 1000)) + if not socket in socks: + break + message = socket.recv() + address, message = message.split(None, 1) + if address in ('cached', 'recoding'): + data = json.dumps({'type': address, 'path': message}) + yield 'data: {0}\n\n'.format(data) + + socket.close() + +class EventPublisher(object): + def __init__(self): + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PUB) + self.socket.bind(config.get('event_publisher')) + + def recoding(self, path): + self.socket.send('recoding {0}'.format(path)) + + def cached(self, path): + self.socket.send('cached {0}'.format(path)) + +event_pub = EventPublisher() -- cgit v1.2.3