import zmq, json from config import config def EventSubscriber(app, environ, start_response, path): session = 'session-' + environ['sessionid'] context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect(config.get('event_subscriber')) socket.setsockopt(zmq.SUBSCRIBE, 'cached') socket.setsockopt(zmq.SUBSCRIBE, 'recoding') socket.setsockopt(zmq.SUBSCRIBE, session) start_response('200 OK', [('Content-Type', 'text/event-stream')]) yield ': event source stream\n\n' poller = zmq.Poller() poller.register(socket, zmq.POLLIN) while True: socks = dict(poller.poll(config.getint('event_timeout') * 1000)) if not socket in socks: break message = socket.recv() address, message = message.split(None, 1) # split session-specific messages if address == session: address, message = message.split(None, 1) data = None if address in ('cached', 'recoding'): track, path = message.split(None, 1) data = json.dumps({'type': address, 'path': path, 'track': None if track == '_' else track}) yield 'data: {0}\n\n'.format(data) elif address in ('play',): data = json.dumps({'type': address, 'path': message}) if data: yield 'data: {0}\n\n'.format(data) socket.close() class EventPublisher(object): def __init__(self): self.context = zmq.Context() self.socket = self.context.socket(zmq.PUB) self.socket.bind(config.get('event_publisher')) def recoding(self, path, track): self.socket.send('recoding {0} {1}'.format(track or '_', path)) def cached(self, path, track): self.socket.send('cached {0} {1}'.format(track or '_', path)) def play(self, session, path): self.socket.send('session-{0} play {1}'.format(session, path)) event_pub = EventPublisher()