summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJon Bergli Heier <snakebite@jvnv.net>2009-11-22 03:00:54 +0100
committerJon Bergli Heier <snakebite@jvnv.net>2009-11-22 03:00:54 +0100
commit95f4c7cb4a9f0a30ab9d78852725a45688a97512 (patch)
treec2dab013ed2769c31ca4b0233faa9cfa69224ab9
parent5d8561e4257f13756010daca1bb098bdef9cb97c (diff)
Added support for using pthreads.
Data processing (parsing) moved to parsing.c. The line parsing code is moved to its own function which is called from one or more threads (depends on the "threads" setting in the config file).
-rw-r--r--Makefile4
-rw-r--r--config.c6
-rw-r--r--config.h6
-rw-r--r--main.c118
-rw-r--r--parsing.c168
-rw-r--r--parsing.h6
6 files changed, 191 insertions, 117 deletions
diff --git a/Makefile b/Makefile
index 193627d..d06e101 100644
--- a/Makefile
+++ b/Makefile
@@ -5,10 +5,12 @@ CFLAGS += -D_GNU_SOURCE
CFLAGS += $(shell pkg-config --cflags libconfig)
CFLAGS += $(shell pcre-config --cflags)
CFLAGS += $(shell xml2-config --cflags)
+CFLAGS += -pthread
LDFLAGS += $(shell pkg-config --libs libconfig)
LDFLAGS += $(shell pcre-config --libs)
LDFLAGS += $(shell xml2-config --libs)
-OBJECTS = main.o config.o regexset.o channel.o user.o word.o sdbm.o export_xml.o nick.o
+LDFLAGS += -pthread
+OBJECTS = main.o config.o regexset.o channel.o user.o word.o sdbm.o export_xml.o nick.o parsing.o
TARGET = ircstats
all: $(TARGET)
diff --git a/config.c b/config.c
index 6cc5aad..b9123b8 100644
--- a/config.c
+++ b/config.c
@@ -3,11 +3,13 @@
#include <libconfig.h>
+#include "config.h"
#include "regexset.h"
#include "channel.h"
#include "nick.h"
config_t config;
+struct ircstats_config_t ircstats_config;
int cfg_init() {
config_init(&config);
@@ -25,6 +27,10 @@ int cfg_init() {
return 0;
}
+ if(!config_lookup_int(&config, "threads", &ircstats_config.threads)) {
+ ircstats_config.threads = 1;
+ }
+
config_setting_t *regexes_setting = config_lookup(&config, "regexes");
if(!config_setting_is_aggregate(regexes_setting)) {
fprintf(stderr, "Setting \"regexes\" must be an aggregate type.\n");
diff --git a/config.h b/config.h
index 486919c..b5b9874 100644
--- a/config.h
+++ b/config.h
@@ -4,4 +4,10 @@
int cfg_init();
void cfg_free();
+struct ircstats_config_t {
+ long int threads;
+};
+
+extern struct ircstats_config_t ircstats_config;
+
#endif
diff --git a/main.c b/main.c
index e30e878..c864b8d 100644
--- a/main.c
+++ b/main.c
@@ -1,8 +1,3 @@
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <wctype.h>
-#include <wchar.h>
#include <locale.h>
#include "config.h"
@@ -12,11 +7,7 @@
#include "word.h"
#include "export_xml.h"
#include "nick.h"
-
-#define NICK_BUFFER_SIZE 0x100
-#define TEXT_BUFFER_SIZE 0x400
-#define LINE_BUFFER_SIZE 0x400
-#define TIME_BUFFER_SIZE 0xf
+#include "parsing.h"
int main(int argc, char **argv) {
/* Set locale. */
@@ -34,112 +25,7 @@ int main(int argc, char **argv) {
return 1;
}
- /* Parsing stuff goes here. */
- for(int chan_i = 0; chan_i < channel_get_count(); chan_i++) {
- user_init();
- word_init();
- struct channel_t *channel = channel_get(chan_i);
- printf("Channel %s\n", channel->name);
- struct channel_file_t *file = channel->files;
- while(file) {
- struct regexset_t *rs = file->rs;
- FILE *f = fopen(file->path, "r");
- if(!f) {
- fprintf(stderr, "\tFailed to open %s\n", file->path);
- file = file->next;
- continue;
- } else
- printf("\tParsing %s\n", file->path);
-
- char line[LINE_BUFFER_SIZE];
- while(fgets(line, LINE_BUFFER_SIZE, f)) {
- int rc;
- int ovector[30];
-
- rc = pcre_exec(rs->text, rs->text_e, line, strlen(line), 0, 0, ovector, 30);
- if(rc > 0) {
- char nick[NICK_BUFFER_SIZE], text[TEXT_BUFFER_SIZE], hour_s[TIME_BUFFER_SIZE], min_s[TIME_BUFFER_SIZE];
- pcre_copy_named_substring(rs->text, line, ovector, rc, "nick", nick, NICK_BUFFER_SIZE);
- pcre_copy_named_substring(rs->text, line, ovector, rc, "text", text, TEXT_BUFFER_SIZE);
- pcre_copy_named_substring(rs->text, line, ovector, rc, "hour", hour_s, TIME_BUFFER_SIZE);
- pcre_copy_named_substring(rs->text, line, ovector, rc, "minute", min_s, TIME_BUFFER_SIZE);
- struct user_t *user = user_get(nick);
-
- /* Calculate array index for lines. */
- int hour, min, time_i;
- hour = atoi(hour_s);
- min = atoi(min_s);
- time_i = hour*4 + min / 15;
-
- user->lines[time_i]++;
- channel->hours[time_i]++;
-
- /* Count words. */
- wchar_t wtext[TEXT_BUFFER_SIZE];
- mbstowcs(wtext, text, TEXT_BUFFER_SIZE);
- user->characters += wcslen(wtext);
- wchar_t word[TEXT_BUFFER_SIZE];
- wchar_t *end = wcschr(wtext, '\0');
- *word = '\0';
- int len = 0;
- for(wchar_t *pos = wtext; pos < end; pos++) {
- if(iswblank(*pos)) {
- if(len) {
- user->words++;
- word[len] = '\0';
- char mbword[TEXT_BUFFER_SIZE];
- wcstombs(mbword, word, TEXT_BUFFER_SIZE);
- struct word_t *word_s = word_get(mbword);
- word_s->count++;
- }
- len = 0;
- *word = '\0';
- } else if(iswalpha(*pos)) {
- word[len++] = towlower(*pos);
- } else {
- len = 0;
- *word = '\0';
- }
- }
- if(len) {
- user->words++;
- word[len] = '\0';
- char mbword[TEXT_BUFFER_SIZE];
- wcstombs(mbword, word, TEXT_BUFFER_SIZE);
- struct word_t *word_s = word_get(mbword);
- word_s->count++;
- }
- continue;
- }
-
- rc = pcre_exec(rs->join, rs->join_e, line, strlen(line), 0, 0, ovector, 30);
- if(rc > 0) {
- char nick[NICK_BUFFER_SIZE];
- pcre_copy_named_substring(rs->join, line, ovector, rc, "nick", nick, NICK_BUFFER_SIZE);
- struct user_t *user = user_get(nick);
- continue;
- }
-
- rc = pcre_exec(rs->kick, rs->kick_e, line, strlen(line), 0, 0, ovector, 30);
- if(rc > 0) {
- char nick[NICK_BUFFER_SIZE], victim[NICK_BUFFER_SIZE];
- pcre_copy_named_substring(rs->kick, line, ovector, rc, "nick", nick, NICK_BUFFER_SIZE);
- pcre_copy_named_substring(rs->kick, line, ovector, rc, "victim", victim, NICK_BUFFER_SIZE);
- struct user_t *user = user_get(nick),
- *victim_user = user_get(victim);
- user->kicks++;
- victim_user->kicked++;
- continue;
- }
- }
- fclose(f);
-
- file = file->next;
- }
- export_xml(channel, users);
- user_free();
- word_free();
- }
+ process(ircstats_config.threads);
nick_free();
cfg_free();
diff --git a/parsing.c b/parsing.c
new file mode 100644
index 0000000..58a5303
--- /dev/null
+++ b/parsing.c
@@ -0,0 +1,168 @@
+#include <stdio.h>
+#include <string.h>
+#include <wctype.h>
+#include <wchar.h>
+#include <pthread.h>
+
+#include "parsing.h"
+#include "channel.h"
+#include "user.h"
+#include "word.h"
+#include "export_xml.h"
+
+#define NICK_BUFFER_SIZE 0x100
+#define TEXT_BUFFER_SIZE 0x400
+#define LINE_BUFFER_SIZE 0x400
+#define TIME_BUFFER_SIZE 0xf
+
+pthread_mutex_t user_mutex, word_mutex;
+
+static void process_file(FILE *f, struct channel_t *channel, struct regexset_t *rs) {
+ char line[LINE_BUFFER_SIZE];
+ while(fgets(line, LINE_BUFFER_SIZE, f)) {
+ int rc;
+ int ovector[30];
+
+ rc = pcre_exec(rs->text, rs->text_e, line, strlen(line), 0, 0, ovector, 30);
+ if(rc > 0) {
+ char nick[NICK_BUFFER_SIZE], text[TEXT_BUFFER_SIZE], hour_s[TIME_BUFFER_SIZE], min_s[TIME_BUFFER_SIZE];
+ pcre_copy_named_substring(rs->text, line, ovector, rc, "nick", nick, NICK_BUFFER_SIZE);
+ pcre_copy_named_substring(rs->text, line, ovector, rc, "text", text, TEXT_BUFFER_SIZE);
+ pcre_copy_named_substring(rs->text, line, ovector, rc, "hour", hour_s, TIME_BUFFER_SIZE);
+ pcre_copy_named_substring(rs->text, line, ovector, rc, "minute", min_s, TIME_BUFFER_SIZE);
+ pthread_mutex_lock(&user_mutex);
+ struct user_t *user = user_get(nick);
+ pthread_mutex_unlock(&user_mutex);
+
+ /* Calculate array index for lines. */
+ int hour, min, time_i;
+ hour = atoi(hour_s);
+ min = atoi(min_s);
+ time_i = hour*4 + min / 15;
+
+ user->lines[time_i]++;
+ channel->hours[time_i]++;
+
+ /* Count words. */
+ wchar_t wtext[TEXT_BUFFER_SIZE];
+ mbstowcs(wtext, text, TEXT_BUFFER_SIZE);
+ user->characters += wcslen(wtext);
+ wchar_t word[TEXT_BUFFER_SIZE];
+ wchar_t *end = wcschr(wtext, '\0');
+ *word = '\0';
+ int len = 0;
+ for(wchar_t *pos = wtext; pos < end; pos++) {
+ if(iswblank(*pos)) {
+ if(len) {
+ user->words++;
+ word[len] = '\0';
+ char mbword[TEXT_BUFFER_SIZE];
+ wcstombs(mbword, word, TEXT_BUFFER_SIZE);
+ pthread_mutex_lock(&word_mutex);
+ struct word_t *word_s = word_get(mbword);
+ pthread_mutex_unlock(&word_mutex);
+ word_s->count++;
+ }
+ len = 0;
+ *word = '\0';
+ } else if(iswalpha(*pos)) {
+ word[len++] = towlower(*pos);
+ } else {
+ len = 0;
+ *word = '\0';
+ }
+ }
+ if(len) {
+ user->words++;
+ word[len] = '\0';
+ char mbword[TEXT_BUFFER_SIZE];
+ wcstombs(mbword, word, TEXT_BUFFER_SIZE);
+ pthread_mutex_lock(&word_mutex);
+ struct word_t *word_s = word_get(mbword);
+ pthread_mutex_unlock(&word_mutex);
+ word_s->count++;
+ }
+ continue;
+ }
+
+ rc = pcre_exec(rs->join, rs->join_e, line, strlen(line), 0, 0, ovector, 30);
+ if(rc > 0) {
+ char nick[NICK_BUFFER_SIZE];
+ pcre_copy_named_substring(rs->join, line, ovector, rc, "nick", nick, NICK_BUFFER_SIZE);
+ pthread_mutex_lock(&user_mutex);
+ struct user_t *user = user_get(nick);
+ pthread_mutex_unlock(&user_mutex);
+ continue;
+ }
+
+ rc = pcre_exec(rs->kick, rs->kick_e, line, strlen(line), 0, 0, ovector, 30);
+ if(rc > 0) {
+ char nick[NICK_BUFFER_SIZE], victim[NICK_BUFFER_SIZE];
+ pcre_copy_named_substring(rs->kick, line, ovector, rc, "nick", nick, NICK_BUFFER_SIZE);
+ pcre_copy_named_substring(rs->kick, line, ovector, rc, "victim", victim, NICK_BUFFER_SIZE);
+ pthread_mutex_lock(&user_mutex);
+ struct user_t *user = user_get(nick),
+ *victim_user = user_get(victim);
+ pthread_mutex_unlock(&user_mutex);
+ user->kicks++;
+ victim_user->kicked++;
+ continue;
+ }
+ }
+}
+
+struct thread_arg_t {
+ FILE *f;
+ struct channel_t *channel;
+ struct regexset_t *rs;
+};
+
+static void *thread_func(void *arg) {
+ struct thread_arg_t *ta = arg;
+ process_file(ta->f, ta->channel, ta->rs);
+}
+
+void process(int thread_n) {
+ pthread_mutex_init(&user_mutex, NULL);
+ pthread_mutex_init(&word_mutex, NULL);
+ /* Parsing stuff goes here. */
+ for(int chan_i = 0; chan_i < channel_get_count(); chan_i++) {
+ user_init();
+ word_init();
+ struct channel_t *channel = channel_get(chan_i);
+ printf("Channel %s\n", channel->name);
+ struct channel_file_t *file = channel->files;
+ while(file) {
+ struct regexset_t *rs = file->rs;
+ FILE *f = fopen(file->path, "r");
+ if(!f) {
+ fprintf(stderr, "\tFailed to open %s\n", file->path);
+ file = file->next;
+ continue;
+ } else
+ printf("\tParsing %s\n", file->path);
+
+ pthread_t *threads;
+ threads = malloc(sizeof(pthread_t) * thread_n);
+ struct thread_arg_t ta;
+ ta.f = f;
+ ta.channel = channel;
+ ta.rs = rs;
+ for(int i = 0; i < thread_n; i++) {
+ pthread_create(&threads[i], NULL, thread_func, &ta);
+ }
+ for(int i = 0; i < thread_n; i++) {
+ pthread_join(threads[i], NULL);
+ }
+ free(threads);
+
+ fclose(f);
+ file = file->next;
+ }
+ export_xml(channel, users);
+ user_free();
+ word_free();
+ }
+ pthread_mutex_destroy(&user_mutex);
+ pthread_mutex_destroy(&word_mutex);
+}
diff --git a/parsing.h b/parsing.h
new file mode 100644
index 0000000..b991d55
--- /dev/null
+++ b/parsing.h
@@ -0,0 +1,6 @@
+#ifndef _PARSING_H_
+#define _PARSING_H_
+
+void process(int thread_n);
+
+#endif