From 95f4c7cb4a9f0a30ab9d78852725a45688a97512 Mon Sep 17 00:00:00 2001 From: Jon Bergli Heier Date: Sun, 22 Nov 2009 03:00:54 +0100 Subject: Added support for using pthreads. Data processing (parsing) moved to parsing.c. The line parsing code is moved to its own function which is called from one or more threads (depends on the "threads" setting in the config file). --- parsing.c | 168 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 parsing.c (limited to 'parsing.c') diff --git a/parsing.c b/parsing.c new file mode 100644 index 0000000..58a5303 --- /dev/null +++ b/parsing.c @@ -0,0 +1,168 @@ +#include +#include +#include +#include +#include + +#include "parsing.h" +#include "channel.h" +#include "user.h" +#include "word.h" +#include "export_xml.h" + +#define NICK_BUFFER_SIZE 0x100 +#define TEXT_BUFFER_SIZE 0x400 +#define LINE_BUFFER_SIZE 0x400 +#define TIME_BUFFER_SIZE 0xf + +pthread_mutex_t user_mutex, word_mutex; + +static void process_file(FILE *f, struct channel_t *channel, struct regexset_t *rs) { + char line[LINE_BUFFER_SIZE]; + while(fgets(line, LINE_BUFFER_SIZE, f)) { + int rc; + int ovector[30]; + + rc = pcre_exec(rs->text, rs->text_e, line, strlen(line), 0, 0, ovector, 30); + if(rc > 0) { + char nick[NICK_BUFFER_SIZE], text[TEXT_BUFFER_SIZE], hour_s[TIME_BUFFER_SIZE], min_s[TIME_BUFFER_SIZE]; + pcre_copy_named_substring(rs->text, line, ovector, rc, "nick", nick, NICK_BUFFER_SIZE); + pcre_copy_named_substring(rs->text, line, ovector, rc, "text", text, TEXT_BUFFER_SIZE); + pcre_copy_named_substring(rs->text, line, ovector, rc, "hour", hour_s, TIME_BUFFER_SIZE); + pcre_copy_named_substring(rs->text, line, ovector, rc, "minute", min_s, TIME_BUFFER_SIZE); + pthread_mutex_lock(&user_mutex); + struct user_t *user = user_get(nick); + pthread_mutex_unlock(&user_mutex); + + /* Calculate array index for lines. */ + int hour, min, time_i; + hour = atoi(hour_s); + min = atoi(min_s); + time_i = hour*4 + min / 15; + + user->lines[time_i]++; + channel->hours[time_i]++; + + /* Count words. */ + wchar_t wtext[TEXT_BUFFER_SIZE]; + mbstowcs(wtext, text, TEXT_BUFFER_SIZE); + user->characters += wcslen(wtext); + wchar_t word[TEXT_BUFFER_SIZE]; + wchar_t *end = wcschr(wtext, '\0'); + *word = '\0'; + int len = 0; + for(wchar_t *pos = wtext; pos < end; pos++) { + if(iswblank(*pos)) { + if(len) { + user->words++; + word[len] = '\0'; + char mbword[TEXT_BUFFER_SIZE]; + wcstombs(mbword, word, TEXT_BUFFER_SIZE); + pthread_mutex_lock(&word_mutex); + struct word_t *word_s = word_get(mbword); + pthread_mutex_unlock(&word_mutex); + word_s->count++; + } + len = 0; + *word = '\0'; + } else if(iswalpha(*pos)) { + word[len++] = towlower(*pos); + } else { + len = 0; + *word = '\0'; + } + } + if(len) { + user->words++; + word[len] = '\0'; + char mbword[TEXT_BUFFER_SIZE]; + wcstombs(mbword, word, TEXT_BUFFER_SIZE); + pthread_mutex_lock(&word_mutex); + struct word_t *word_s = word_get(mbword); + pthread_mutex_unlock(&word_mutex); + word_s->count++; + } + continue; + } + + rc = pcre_exec(rs->join, rs->join_e, line, strlen(line), 0, 0, ovector, 30); + if(rc > 0) { + char nick[NICK_BUFFER_SIZE]; + pcre_copy_named_substring(rs->join, line, ovector, rc, "nick", nick, NICK_BUFFER_SIZE); + pthread_mutex_lock(&user_mutex); + struct user_t *user = user_get(nick); + pthread_mutex_unlock(&user_mutex); + continue; + } + + rc = pcre_exec(rs->kick, rs->kick_e, line, strlen(line), 0, 0, ovector, 30); + if(rc > 0) { + char nick[NICK_BUFFER_SIZE], victim[NICK_BUFFER_SIZE]; + pcre_copy_named_substring(rs->kick, line, ovector, rc, "nick", nick, NICK_BUFFER_SIZE); + pcre_copy_named_substring(rs->kick, line, ovector, rc, "victim", victim, NICK_BUFFER_SIZE); + pthread_mutex_lock(&user_mutex); + struct user_t *user = user_get(nick), + *victim_user = user_get(victim); + pthread_mutex_unlock(&user_mutex); + user->kicks++; + victim_user->kicked++; + continue; + } + } +} + +struct thread_arg_t { + FILE *f; + struct channel_t *channel; + struct regexset_t *rs; +}; + +static void *thread_func(void *arg) { + struct thread_arg_t *ta = arg; + process_file(ta->f, ta->channel, ta->rs); +} + +void process(int thread_n) { + pthread_mutex_init(&user_mutex, NULL); + pthread_mutex_init(&word_mutex, NULL); + /* Parsing stuff goes here. */ + for(int chan_i = 0; chan_i < channel_get_count(); chan_i++) { + user_init(); + word_init(); + struct channel_t *channel = channel_get(chan_i); + printf("Channel %s\n", channel->name); + struct channel_file_t *file = channel->files; + while(file) { + struct regexset_t *rs = file->rs; + FILE *f = fopen(file->path, "r"); + if(!f) { + fprintf(stderr, "\tFailed to open %s\n", file->path); + file = file->next; + continue; + } else + printf("\tParsing %s\n", file->path); + + pthread_t *threads; + threads = malloc(sizeof(pthread_t) * thread_n); + struct thread_arg_t ta; + ta.f = f; + ta.channel = channel; + ta.rs = rs; + for(int i = 0; i < thread_n; i++) { + pthread_create(&threads[i], NULL, thread_func, &ta); + } + for(int i = 0; i < thread_n; i++) { + pthread_join(threads[i], NULL); + } + free(threads); + + fclose(f); + file = file->next; + } + export_xml(channel, users); + user_free(); + word_free(); + } + pthread_mutex_destroy(&user_mutex); + pthread_mutex_destroy(&word_mutex); +} -- cgit v1.2.3