summaryrefslogtreecommitdiff
path: root/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'events.py')
-rw-r--r--events.py39
1 files changed, 39 insertions, 0 deletions
diff --git a/events.py b/events.py
new file mode 100644
index 0000000..4759a19
--- /dev/null
+++ b/events.py
@@ -0,0 +1,39 @@
+import zmq, json
+from config import config
+
+def EventSubscriber(app, environ, start_response, path):
+ context = zmq.Context()
+ socket = context.socket(zmq.SUB)
+ socket.connect(config.get('event_subscriber'))
+ socket.setsockopt(zmq.SUBSCRIBE, 'cached')
+ socket.setsockopt(zmq.SUBSCRIBE, 'recoding')
+
+ start_response('200 OK', [('Content-Type', 'text/event-stream')])
+ yield ': event source stream\n\n'
+ poller = zmq.Poller()
+ poller.register(socket, zmq.POLLIN)
+ while True:
+ socks = dict(poller.poll(config.getint('event_timeout') * 1000))
+ if not socket in socks:
+ break
+ message = socket.recv()
+ address, message = message.split(None, 1)
+ if address in ('cached', 'recoding'):
+ data = json.dumps({'type': address, 'path': message})
+ yield 'data: {0}\n\n'.format(data)
+
+ socket.close()
+
+class EventPublisher(object):
+ def __init__(self):
+ self.context = zmq.Context()
+ self.socket = self.context.socket(zmq.PUB)
+ self.socket.bind(config.get('event_publisher'))
+
+ def recoding(self, path):
+ self.socket.send('recoding {0}'.format(path))
+
+ def cached(self, path):
+ self.socket.send('cached {0}'.format(path))
+
+event_pub = EventPublisher()