summaryrefslogtreecommitdiff
path: root/parsing.c
diff options
context:
space:
mode:
authorJon Bergli Heier <snakebite@jvnv.net>2009-11-22 03:00:54 +0100
committerJon Bergli Heier <snakebite@jvnv.net>2009-11-22 03:00:54 +0100
commit95f4c7cb4a9f0a30ab9d78852725a45688a97512 (patch)
treec2dab013ed2769c31ca4b0233faa9cfa69224ab9 /parsing.c
parent5d8561e4257f13756010daca1bb098bdef9cb97c (diff)
Added support for using pthreads.
Data processing (parsing) moved to parsing.c. The line parsing code is moved to its own function which is called from one or more threads (depends on the "threads" setting in the config file).
Diffstat (limited to 'parsing.c')
-rw-r--r--parsing.c168
1 files changed, 168 insertions, 0 deletions
diff --git a/parsing.c b/parsing.c
new file mode 100644
index 0000000..58a5303
--- /dev/null
+++ b/parsing.c
@@ -0,0 +1,168 @@
+#include <stdio.h>
+#include <string.h>
+#include <wctype.h>
+#include <wchar.h>
+#include <pthread.h>
+
+#include "parsing.h"
+#include "channel.h"
+#include "user.h"
+#include "word.h"
+#include "export_xml.h"
+
+#define NICK_BUFFER_SIZE 0x100
+#define TEXT_BUFFER_SIZE 0x400
+#define LINE_BUFFER_SIZE 0x400
+#define TIME_BUFFER_SIZE 0xf
+
+pthread_mutex_t user_mutex, word_mutex;
+
+static void process_file(FILE *f, struct channel_t *channel, struct regexset_t *rs) {
+ char line[LINE_BUFFER_SIZE];
+ while(fgets(line, LINE_BUFFER_SIZE, f)) {
+ int rc;
+ int ovector[30];
+
+ rc = pcre_exec(rs->text, rs->text_e, line, strlen(line), 0, 0, ovector, 30);
+ if(rc > 0) {
+ char nick[NICK_BUFFER_SIZE], text[TEXT_BUFFER_SIZE], hour_s[TIME_BUFFER_SIZE], min_s[TIME_BUFFER_SIZE];
+ pcre_copy_named_substring(rs->text, line, ovector, rc, "nick", nick, NICK_BUFFER_SIZE);
+ pcre_copy_named_substring(rs->text, line, ovector, rc, "text", text, TEXT_BUFFER_SIZE);
+ pcre_copy_named_substring(rs->text, line, ovector, rc, "hour", hour_s, TIME_BUFFER_SIZE);
+ pcre_copy_named_substring(rs->text, line, ovector, rc, "minute", min_s, TIME_BUFFER_SIZE);
+ pthread_mutex_lock(&user_mutex);
+ struct user_t *user = user_get(nick);
+ pthread_mutex_unlock(&user_mutex);
+
+ /* Calculate array index for lines. */
+ int hour, min, time_i;
+ hour = atoi(hour_s);
+ min = atoi(min_s);
+ time_i = hour*4 + min / 15;
+
+ user->lines[time_i]++;
+ channel->hours[time_i]++;
+
+ /* Count words. */
+ wchar_t wtext[TEXT_BUFFER_SIZE];
+ mbstowcs(wtext, text, TEXT_BUFFER_SIZE);
+ user->characters += wcslen(wtext);
+ wchar_t word[TEXT_BUFFER_SIZE];
+ wchar_t *end = wcschr(wtext, '\0');
+ *word = '\0';
+ int len = 0;
+ for(wchar_t *pos = wtext; pos < end; pos++) {
+ if(iswblank(*pos)) {
+ if(len) {
+ user->words++;
+ word[len] = '\0';
+ char mbword[TEXT_BUFFER_SIZE];
+ wcstombs(mbword, word, TEXT_BUFFER_SIZE);
+ pthread_mutex_lock(&word_mutex);
+ struct word_t *word_s = word_get(mbword);
+ pthread_mutex_unlock(&word_mutex);
+ word_s->count++;
+ }
+ len = 0;
+ *word = '\0';
+ } else if(iswalpha(*pos)) {
+ word[len++] = towlower(*pos);
+ } else {
+ len = 0;
+ *word = '\0';
+ }
+ }
+ if(len) {
+ user->words++;
+ word[len] = '\0';
+ char mbword[TEXT_BUFFER_SIZE];
+ wcstombs(mbword, word, TEXT_BUFFER_SIZE);
+ pthread_mutex_lock(&word_mutex);
+ struct word_t *word_s = word_get(mbword);
+ pthread_mutex_unlock(&word_mutex);
+ word_s->count++;
+ }
+ continue;
+ }
+
+ rc = pcre_exec(rs->join, rs->join_e, line, strlen(line), 0, 0, ovector, 30);
+ if(rc > 0) {
+ char nick[NICK_BUFFER_SIZE];
+ pcre_copy_named_substring(rs->join, line, ovector, rc, "nick", nick, NICK_BUFFER_SIZE);
+ pthread_mutex_lock(&user_mutex);
+ struct user_t *user = user_get(nick);
+ pthread_mutex_unlock(&user_mutex);
+ continue;
+ }
+
+ rc = pcre_exec(rs->kick, rs->kick_e, line, strlen(line), 0, 0, ovector, 30);
+ if(rc > 0) {
+ char nick[NICK_BUFFER_SIZE], victim[NICK_BUFFER_SIZE];
+ pcre_copy_named_substring(rs->kick, line, ovector, rc, "nick", nick, NICK_BUFFER_SIZE);
+ pcre_copy_named_substring(rs->kick, line, ovector, rc, "victim", victim, NICK_BUFFER_SIZE);
+ pthread_mutex_lock(&user_mutex);
+ struct user_t *user = user_get(nick),
+ *victim_user = user_get(victim);
+ pthread_mutex_unlock(&user_mutex);
+ user->kicks++;
+ victim_user->kicked++;
+ continue;
+ }
+ }
+}
+
+struct thread_arg_t {
+ FILE *f;
+ struct channel_t *channel;
+ struct regexset_t *rs;
+};
+
+static void *thread_func(void *arg) {
+ struct thread_arg_t *ta = arg;
+ process_file(ta->f, ta->channel, ta->rs);
+}
+
+void process(int thread_n) {
+ pthread_mutex_init(&user_mutex, NULL);
+ pthread_mutex_init(&word_mutex, NULL);
+ /* Parsing stuff goes here. */
+ for(int chan_i = 0; chan_i < channel_get_count(); chan_i++) {
+ user_init();
+ word_init();
+ struct channel_t *channel = channel_get(chan_i);
+ printf("Channel %s\n", channel->name);
+ struct channel_file_t *file = channel->files;
+ while(file) {
+ struct regexset_t *rs = file->rs;
+ FILE *f = fopen(file->path, "r");
+ if(!f) {
+ fprintf(stderr, "\tFailed to open %s\n", file->path);
+ file = file->next;
+ continue;
+ } else
+ printf("\tParsing %s\n", file->path);
+
+ pthread_t *threads;
+ threads = malloc(sizeof(pthread_t) * thread_n);
+ struct thread_arg_t ta;
+ ta.f = f;
+ ta.channel = channel;
+ ta.rs = rs;
+ for(int i = 0; i < thread_n; i++) {
+ pthread_create(&threads[i], NULL, thread_func, &ta);
+ }
+ for(int i = 0; i < thread_n; i++) {
+ pthread_join(threads[i], NULL);
+ }
+ free(threads);
+
+ fclose(f);
+ file = file->next;
+ }
+ export_xml(channel, users);
+ user_free();
+ word_free();
+ }
+ pthread_mutex_destroy(&user_mutex);
+ pthread_mutex_destroy(&word_mutex);
+}