diff options
author | Jon Bergli Heier <snakebite@jvnv.net> | 2009-11-22 03:00:54 +0100 |
---|---|---|
committer | Jon Bergli Heier <snakebite@jvnv.net> | 2009-11-22 03:00:54 +0100 |
commit | 95f4c7cb4a9f0a30ab9d78852725a45688a97512 (patch) | |
tree | c2dab013ed2769c31ca4b0233faa9cfa69224ab9 | |
parent | 5d8561e4257f13756010daca1bb098bdef9cb97c (diff) |
Added support for using pthreads.
Data processing (parsing) moved to parsing.c. The line parsing
code is moved to its own function which is called from one or
more threads (depends on the "threads" setting in the config file).
-rw-r--r-- | Makefile | 4 | ||||
-rw-r--r-- | config.c | 6 | ||||
-rw-r--r-- | config.h | 6 | ||||
-rw-r--r-- | main.c | 118 | ||||
-rw-r--r-- | parsing.c | 168 | ||||
-rw-r--r-- | parsing.h | 6 |
6 files changed, 191 insertions, 117 deletions
@@ -5,10 +5,12 @@ CFLAGS += -D_GNU_SOURCE CFLAGS += $(shell pkg-config --cflags libconfig) CFLAGS += $(shell pcre-config --cflags) CFLAGS += $(shell xml2-config --cflags) +CFLAGS += -pthread LDFLAGS += $(shell pkg-config --libs libconfig) LDFLAGS += $(shell pcre-config --libs) LDFLAGS += $(shell xml2-config --libs) -OBJECTS = main.o config.o regexset.o channel.o user.o word.o sdbm.o export_xml.o nick.o +LDFLAGS += -pthread +OBJECTS = main.o config.o regexset.o channel.o user.o word.o sdbm.o export_xml.o nick.o parsing.o TARGET = ircstats all: $(TARGET) @@ -3,11 +3,13 @@ #include <libconfig.h> +#include "config.h" #include "regexset.h" #include "channel.h" #include "nick.h" config_t config; +struct ircstats_config_t ircstats_config; int cfg_init() { config_init(&config); @@ -25,6 +27,10 @@ int cfg_init() { return 0; } + if(!config_lookup_int(&config, "threads", &ircstats_config.threads)) { + ircstats_config.threads = 1; + } + config_setting_t *regexes_setting = config_lookup(&config, "regexes"); if(!config_setting_is_aggregate(regexes_setting)) { fprintf(stderr, "Setting \"regexes\" must be an aggregate type.\n"); @@ -4,4 +4,10 @@ int cfg_init(); void cfg_free(); +struct ircstats_config_t { + long int threads; +}; + +extern struct ircstats_config_t ircstats_config; + #endif @@ -1,8 +1,3 @@ -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <wctype.h> -#include <wchar.h> #include <locale.h> #include "config.h" @@ -12,11 +7,7 @@ #include "word.h" #include "export_xml.h" #include "nick.h" - -#define NICK_BUFFER_SIZE 0x100 -#define TEXT_BUFFER_SIZE 0x400 -#define LINE_BUFFER_SIZE 0x400 -#define TIME_BUFFER_SIZE 0xf +#include "parsing.h" int main(int argc, char **argv) { /* Set locale. */ @@ -34,112 +25,7 @@ int main(int argc, char **argv) { return 1; } - /* Parsing stuff goes here. */ - for(int chan_i = 0; chan_i < channel_get_count(); chan_i++) { - user_init(); - word_init(); - struct channel_t *channel = channel_get(chan_i); - printf("Channel %s\n", channel->name); - struct channel_file_t *file = channel->files; - while(file) { - struct regexset_t *rs = file->rs; - FILE *f = fopen(file->path, "r"); - if(!f) { - fprintf(stderr, "\tFailed to open %s\n", file->path); - file = file->next; - continue; - } else - printf("\tParsing %s\n", file->path); - - char line[LINE_BUFFER_SIZE]; - while(fgets(line, LINE_BUFFER_SIZE, f)) { - int rc; - int ovector[30]; - - rc = pcre_exec(rs->text, rs->text_e, line, strlen(line), 0, 0, ovector, 30); - if(rc > 0) { - char nick[NICK_BUFFER_SIZE], text[TEXT_BUFFER_SIZE], hour_s[TIME_BUFFER_SIZE], min_s[TIME_BUFFER_SIZE]; - pcre_copy_named_substring(rs->text, line, ovector, rc, "nick", nick, NICK_BUFFER_SIZE); - pcre_copy_named_substring(rs->text, line, ovector, rc, "text", text, TEXT_BUFFER_SIZE); - pcre_copy_named_substring(rs->text, line, ovector, rc, "hour", hour_s, TIME_BUFFER_SIZE); - pcre_copy_named_substring(rs->text, line, ovector, rc, "minute", min_s, TIME_BUFFER_SIZE); - struct user_t *user = user_get(nick); - - /* Calculate array index for lines. */ - int hour, min, time_i; - hour = atoi(hour_s); - min = atoi(min_s); - time_i = hour*4 + min / 15; - - user->lines[time_i]++; - channel->hours[time_i]++; - - /* Count words. */ - wchar_t wtext[TEXT_BUFFER_SIZE]; - mbstowcs(wtext, text, TEXT_BUFFER_SIZE); - user->characters += wcslen(wtext); - wchar_t word[TEXT_BUFFER_SIZE]; - wchar_t *end = wcschr(wtext, '\0'); - *word = '\0'; - int len = 0; - for(wchar_t *pos = wtext; pos < end; pos++) { - if(iswblank(*pos)) { - if(len) { - user->words++; - word[len] = '\0'; - char mbword[TEXT_BUFFER_SIZE]; - wcstombs(mbword, word, TEXT_BUFFER_SIZE); - struct word_t *word_s = word_get(mbword); - word_s->count++; - } - len = 0; - *word = '\0'; - } else if(iswalpha(*pos)) { - word[len++] = towlower(*pos); - } else { - len = 0; - *word = '\0'; - } - } - if(len) { - user->words++; - word[len] = '\0'; - char mbword[TEXT_BUFFER_SIZE]; - wcstombs(mbword, word, TEXT_BUFFER_SIZE); - struct word_t *word_s = word_get(mbword); - word_s->count++; - } - continue; - } - - rc = pcre_exec(rs->join, rs->join_e, line, strlen(line), 0, 0, ovector, 30); - if(rc > 0) { - char nick[NICK_BUFFER_SIZE]; - pcre_copy_named_substring(rs->join, line, ovector, rc, "nick", nick, NICK_BUFFER_SIZE); - struct user_t *user = user_get(nick); - continue; - } - - rc = pcre_exec(rs->kick, rs->kick_e, line, strlen(line), 0, 0, ovector, 30); - if(rc > 0) { - char nick[NICK_BUFFER_SIZE], victim[NICK_BUFFER_SIZE]; - pcre_copy_named_substring(rs->kick, line, ovector, rc, "nick", nick, NICK_BUFFER_SIZE); - pcre_copy_named_substring(rs->kick, line, ovector, rc, "victim", victim, NICK_BUFFER_SIZE); - struct user_t *user = user_get(nick), - *victim_user = user_get(victim); - user->kicks++; - victim_user->kicked++; - continue; - } - } - fclose(f); - - file = file->next; - } - export_xml(channel, users); - user_free(); - word_free(); - } + process(ircstats_config.threads); nick_free(); cfg_free(); diff --git a/parsing.c b/parsing.c new file mode 100644 index 0000000..58a5303 --- /dev/null +++ b/parsing.c @@ -0,0 +1,168 @@ +#include <stdio.h> +#include <string.h> +#include <wctype.h> +#include <wchar.h> +#include <pthread.h> + +#include "parsing.h" +#include "channel.h" +#include "user.h" +#include "word.h" +#include "export_xml.h" + +#define NICK_BUFFER_SIZE 0x100 +#define TEXT_BUFFER_SIZE 0x400 +#define LINE_BUFFER_SIZE 0x400 +#define TIME_BUFFER_SIZE 0xf + +pthread_mutex_t user_mutex, word_mutex; + +static void process_file(FILE *f, struct channel_t *channel, struct regexset_t *rs) { + char line[LINE_BUFFER_SIZE]; + while(fgets(line, LINE_BUFFER_SIZE, f)) { + int rc; + int ovector[30]; + + rc = pcre_exec(rs->text, rs->text_e, line, strlen(line), 0, 0, ovector, 30); + if(rc > 0) { + char nick[NICK_BUFFER_SIZE], text[TEXT_BUFFER_SIZE], hour_s[TIME_BUFFER_SIZE], min_s[TIME_BUFFER_SIZE]; + pcre_copy_named_substring(rs->text, line, ovector, rc, "nick", nick, NICK_BUFFER_SIZE); + pcre_copy_named_substring(rs->text, line, ovector, rc, "text", text, TEXT_BUFFER_SIZE); + pcre_copy_named_substring(rs->text, line, ovector, rc, "hour", hour_s, TIME_BUFFER_SIZE); + pcre_copy_named_substring(rs->text, line, ovector, rc, "minute", min_s, TIME_BUFFER_SIZE); + pthread_mutex_lock(&user_mutex); + struct user_t *user = user_get(nick); + pthread_mutex_unlock(&user_mutex); + + /* Calculate array index for lines. */ + int hour, min, time_i; + hour = atoi(hour_s); + min = atoi(min_s); + time_i = hour*4 + min / 15; + + user->lines[time_i]++; + channel->hours[time_i]++; + + /* Count words. */ + wchar_t wtext[TEXT_BUFFER_SIZE]; + mbstowcs(wtext, text, TEXT_BUFFER_SIZE); + user->characters += wcslen(wtext); + wchar_t word[TEXT_BUFFER_SIZE]; + wchar_t *end = wcschr(wtext, '\0'); + *word = '\0'; + int len = 0; + for(wchar_t *pos = wtext; pos < end; pos++) { + if(iswblank(*pos)) { + if(len) { + user->words++; + word[len] = '\0'; + char mbword[TEXT_BUFFER_SIZE]; + wcstombs(mbword, word, TEXT_BUFFER_SIZE); + pthread_mutex_lock(&word_mutex); + struct word_t *word_s = word_get(mbword); + pthread_mutex_unlock(&word_mutex); + word_s->count++; + } + len = 0; + *word = '\0'; + } else if(iswalpha(*pos)) { + word[len++] = towlower(*pos); + } else { + len = 0; + *word = '\0'; + } + } + if(len) { + user->words++; + word[len] = '\0'; + char mbword[TEXT_BUFFER_SIZE]; + wcstombs(mbword, word, TEXT_BUFFER_SIZE); + pthread_mutex_lock(&word_mutex); + struct word_t *word_s = word_get(mbword); + pthread_mutex_unlock(&word_mutex); + word_s->count++; + } + continue; + } + + rc = pcre_exec(rs->join, rs->join_e, line, strlen(line), 0, 0, ovector, 30); + if(rc > 0) { + char nick[NICK_BUFFER_SIZE]; + pcre_copy_named_substring(rs->join, line, ovector, rc, "nick", nick, NICK_BUFFER_SIZE); + pthread_mutex_lock(&user_mutex); + struct user_t *user = user_get(nick); + pthread_mutex_unlock(&user_mutex); + continue; + } + + rc = pcre_exec(rs->kick, rs->kick_e, line, strlen(line), 0, 0, ovector, 30); + if(rc > 0) { + char nick[NICK_BUFFER_SIZE], victim[NICK_BUFFER_SIZE]; + pcre_copy_named_substring(rs->kick, line, ovector, rc, "nick", nick, NICK_BUFFER_SIZE); + pcre_copy_named_substring(rs->kick, line, ovector, rc, "victim", victim, NICK_BUFFER_SIZE); + pthread_mutex_lock(&user_mutex); + struct user_t *user = user_get(nick), + *victim_user = user_get(victim); + pthread_mutex_unlock(&user_mutex); + user->kicks++; + victim_user->kicked++; + continue; + } + } +} + +struct thread_arg_t { + FILE *f; + struct channel_t *channel; + struct regexset_t *rs; +}; + +static void *thread_func(void *arg) { + struct thread_arg_t *ta = arg; + process_file(ta->f, ta->channel, ta->rs); +} + +void process(int thread_n) { + pthread_mutex_init(&user_mutex, NULL); + pthread_mutex_init(&word_mutex, NULL); + /* Parsing stuff goes here. */ + for(int chan_i = 0; chan_i < channel_get_count(); chan_i++) { + user_init(); + word_init(); + struct channel_t *channel = channel_get(chan_i); + printf("Channel %s\n", channel->name); + struct channel_file_t *file = channel->files; + while(file) { + struct regexset_t *rs = file->rs; + FILE *f = fopen(file->path, "r"); + if(!f) { + fprintf(stderr, "\tFailed to open %s\n", file->path); + file = file->next; + continue; + } else + printf("\tParsing %s\n", file->path); + + pthread_t *threads; + threads = malloc(sizeof(pthread_t) * thread_n); + struct thread_arg_t ta; + ta.f = f; + ta.channel = channel; + ta.rs = rs; + for(int i = 0; i < thread_n; i++) { + pthread_create(&threads[i], NULL, thread_func, &ta); + } + for(int i = 0; i < thread_n; i++) { + pthread_join(threads[i], NULL); + } + free(threads); + + fclose(f); + file = file->next; + } + export_xml(channel, users); + user_free(); + word_free(); + } + pthread_mutex_destroy(&user_mutex); + pthread_mutex_destroy(&word_mutex); +} diff --git a/parsing.h b/parsing.h new file mode 100644 index 0000000..b991d55 --- /dev/null +++ b/parsing.h @@ -0,0 +1,6 @@ +#ifndef _PARSING_H_ +#define _PARSING_H_ + +void process(int thread_n); + +#endif |