summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Harley2020-01-24 05:15:35 +0000
committerTom Harley2020-01-24 05:15:35 +0000
commitc9f0de12be31eea463bf022f3eee76490b30f8dc (patch)
treea80718ff00fa2dcb1c52c2af6828feffc7f89d8b
downloadparapat-master.tar.gz
parapat-master.zip

Initial commit

HEADmaster
-rw-r--r--.gitignore7
-rw-r--r--Makefile30
-rw-r--r--debug.h18
-rw-r--r--parapat.c438
-rw-r--r--parapat.h35
-rw-r--r--queue.c63
-rw-r--r--queue.h25
-rw-r--r--test/Makefile23
-rw-r--r--test/farm-tests.c89
-rw-r--r--test/fib.c59
-rw-r--r--test/minunit.h17
-rw-r--r--test/pipeline-tests.c83
-rw-r--r--test/test.c55
13 files changed, 942 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..3d9a02f
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,7 @@
+*.o
+*.d
+*.sw?
+*.so
+*.a
+test-*
+*.time
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..b0f38ab
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,30 @@
+COMMON += -ggdb3 -O0 -Wpedantic -Wall -Werror -Wextra -std=c89
+TRGT = libparapat.a libparapat.so
+
+.PHONY: default all clean test
+
+default: all
+
+all: $(TRGT)
+
+%.o: %.c
+ $(CC) $(COMMON) -c -MMD -MP -o $@ $<
+
+%.pic.o: %.c
+ $(CC) $(COMMON) -c -fPIC -MMD -MP -o $@ $<
+
+libparapat.so: parapat.pic.o queue.pic.o
+ $(CC) $(COMMON) -shared -o $@ $^
+
+.NOTPARALLEL: libparapat.a
+
+libparapat.a: libparapat.a(parapat.o) libparapat.a(queue.o)
+
+test: $(TRGT)
+ (cd test && $(MAKE))
+
+clean:
+ $(RM) $(TRGT) *.o *.d
+ $(MAKE) -C test clean
+
+-include $(wildcard *.d)
diff --git a/debug.h b/debug.h
new file mode 100644
index 0000000..8be873a
--- /dev/null
+++ b/debug.h
@@ -0,0 +1,18 @@
+#ifndef DEBUG_H
+#define DEBUG_H
+
+#ifndef NDEBUG
+
+#include <assert.h>
+#include <stdio.h>
+
+#define debug(x) printf x
+
+#else
+
+#define assert(x)
+#define debug(x) (x)
+
+#endif /* NDEBUG */
+
+#endif /* DEBUG_H */
diff --git a/parapat.c b/parapat.c
new file mode 100644
index 0000000..f51c4a4
--- /dev/null
+++ b/parapat.c
@@ -0,0 +1,438 @@
+#include <stdlib.h>
+#include <pthread.h>
+
+#include "debug.h"
+#include "queue.h"
+
+#include "parapat.h"
+
+typedef struct {
+ emitter *efn;
+ void *arg;
+} handle_emitter_info;
+
+typedef struct {
+ worker *wfn;
+ void *arg;
+} handle_worker_info;
+
+typedef struct {
+ collector *cfn;
+ void *arg;
+} handle_collector_info;
+
+void *handle_emitter(queue *out, void *v) {
+ handle_emitter_info *hei = v;
+
+ int keep_going;
+ void *x;
+ do {
+ debug(("running emitter\n"));
+ keep_going = hei->efn(&x, hei->arg) == 0;
+ enqueue(out, x);
+ } while (keep_going);
+
+ debug(("emitter completed\n"));
+
+ pq_close(out);
+
+ return NULL;
+}
+
+void *handle_collector(queue *in, void *v) {
+ handle_collector_info *hci = v;
+
+ void *out;
+
+ while (1) {
+ void *res;
+ void *temp;
+
+ int stop = dequeue(in, &res);
+ if (stop) {
+ break;
+ }
+
+ temp = (void *) out;
+ debug(("running collector on %p and %p\n", temp, res));
+
+ out = hci->cfn(temp, res, hci->arg);
+ }
+
+ debug(("collector completed, final value %p\n", out));
+
+ return out;
+}
+
+void *handle_worker(queue *in, queue *out, void *v) {
+ handle_worker_info *hwi = v;
+
+ while (1) {
+ void *arg;
+
+ int stop = dequeue(in, &arg);
+ if (stop) {
+ break;
+ }
+
+ debug(("running worker on %p\n", arg));
+ enqueue(out, hwi->wfn(arg, hwi->arg));
+ }
+
+ debug(("worker completed\n"));
+
+ pq_close(out);
+
+ return NULL;
+}
+
+typedef struct pipe_piece {
+ pthread_t t_id;
+ pthread_t prev_t_id;
+ worker *fn;
+ queue *in;
+ queue *out;
+} pipe_piece;
+
+typedef struct {
+ pipe_piece *pieces;
+ int piece_count;
+ queue *in;
+ queue *out;
+} pipeline;
+
+typedef struct {
+ queue_emitter *efn;
+ queue *out;
+ void *arg;
+} queue_emitter_void_info;
+
+void *queue_emitter_void(void *v) {
+ queue_emitter_void_info *qevi = v;
+ qevi->efn(qevi->out, qevi->arg);
+ return 0;
+}
+
+typedef struct {
+ queue_worker *wfn;
+ queue *in;
+ queue *out;
+ void *arg;
+} queue_worker_void_info;
+
+void *queue_worker_void(void *v) {
+ queue_worker_void_info *qwvi = v;
+ qwvi->wfn(qwvi->in, qwvi->out, qwvi->arg);
+ return 0;
+}
+
+typedef struct {
+ queue_collector *cfn;
+ queue *in;
+ void *arg;
+} queue_collector_void_info;
+
+void *queue_collector_void(void *v) {
+ queue_collector_void_info *qcvi = v;
+ return qcvi->cfn(qcvi->in, qcvi->arg);
+}
+
+void *install_farm_queue(queue_worker *wfn, int n, queue *in, queue *out,
+ void *arg) {
+
+ queue_worker_void_info qwvi;
+ int i;
+
+ pthread_t *t_ids = malloc(sizeof *t_ids * n);
+
+ qwvi.in = in;
+ qwvi.out = out;
+ qwvi.wfn = wfn;
+ qwvi.arg = arg;
+
+ for (i = 0; i < n; ++i) {
+ pthread_create(t_ids + i, NULL, queue_worker_void, &qwvi);
+ debug(("thread %lu created\n", t_ids[i]));
+ }
+
+ for (i = 0; i < n; ++i) {
+ pthread_join(t_ids[i], NULL);
+ debug(("thread %lu joined\n", t_ids[i]));
+ }
+
+ pq_close(out);
+
+ return NULL;
+}
+
+void *install_farm(worker *wfn, int n, queue *in, queue *out, void *arg) {
+ handle_worker_info hwi;
+ hwi.wfn = wfn;
+ hwi.arg = arg;
+ return install_farm_queue(handle_worker, n, in, out, &hwi);
+}
+
+typedef struct {
+ queue_worker *wfn;
+ int n;
+ void *arg;
+} install_farm_worker_info;
+
+void *install_farm_worker(queue *in, queue *out, void *arg) {
+ install_farm_worker_info *ifqwi = arg;
+ return install_farm_queue(ifqwi->wfn, ifqwi->n, in, out, ifqwi->arg);
+}
+
+typedef struct {
+ worker *wfn;
+ int n;
+ queue *in;
+ queue *out;
+ void *arg;
+} install_farm_void_info;
+
+void *install_farm_void(void *v) {
+ install_farm_void_info *ifvi = v;
+ return install_farm(ifvi->wfn, ifvi->n, ifvi->in, ifvi->out, ifvi->arg);
+}
+
+void *create_queue_base(queue_emitter *efn, queue_worker *wfn,
+ queue_collector *cfn, void *efn_arg, void *wfn_arg, void *cfn_arg) {
+
+ pthread_t etid, wtid, ctid;
+
+ queue_emitter_void_info qevi;
+ queue_worker_void_info qwvi;
+ queue_collector_void_info qcvi;
+
+ queue in, out;
+
+ void *result;
+
+ qevi.efn = efn;
+ qevi.out = &in;
+ qevi.arg = efn_arg;
+
+ qwvi.wfn = wfn;
+ qwvi.in = &in;
+ qwvi.out = &out;
+ qwvi.arg = wfn_arg;
+
+ qcvi.cfn = cfn;
+ qcvi.in = &out;
+ qcvi.arg = cfn_arg;
+
+ init_queue(&in);
+ init_queue(&out);
+
+ pthread_create(&etid, NULL, queue_emitter_void, &qevi);
+ pthread_create(&wtid, NULL, queue_worker_void, &qwvi);
+ pthread_create(&ctid, NULL, queue_collector_void, &qcvi);
+
+ pthread_join(etid, &result);
+ if (result) {
+ return result;
+ }
+
+ pthread_join(wtid, &result);
+ if (result) {
+ return result;
+ }
+
+ pthread_join(ctid, &result);
+ return result;
+}
+
+void *create_queue_farm(queue_emitter *efn, queue_worker *wfn,
+ queue_collector *cfn, int n, void *efn_arg, void *wfn_arg,
+ void *cfn_arg) {
+
+ install_farm_worker_info ifwi;
+ ifwi.wfn = wfn;
+ ifwi.n = n;
+ ifwi.arg = wfn_arg;
+
+ return create_queue_base(efn, install_farm_worker, cfn, efn_arg, &ifwi,
+ cfn_arg);
+}
+
+void *create_farm(emitter *efn, worker *wfn, collector *cfn, int n,
+ void *efn_arg, void *wfn_arg, void *cfn_arg) {
+
+ handle_emitter_info hei;
+ handle_worker_info hwi;
+ handle_collector_info hci;
+
+ hei.efn = efn;
+ hei.arg = efn_arg;
+
+ hwi.wfn = wfn;
+ hwi.arg = wfn_arg;
+
+ hci.cfn = cfn;
+ hci.arg = cfn_arg;
+
+ return create_queue_farm(handle_emitter, handle_worker, handle_collector,
+ n, &hei, &hwi, &hci);
+}
+
+void *install_queue_pipeline(queue_worker **wfns, int n, queue *in,
+ queue *out, void **args) {
+
+ int i;
+ queue_worker_void_info *pieces;
+ queue *queues;
+ pthread_t *tids;
+
+ void *result;
+
+ if (!n) {
+ return NULL;
+ }
+
+ pieces = malloc(sizeof *pieces * n);
+ queues = malloc(sizeof *queues * (n - 1));
+ tids = malloc(sizeof *tids * n);
+
+ /*
+ * Initialise new queues.
+ */
+ for (i = 0; i < n - 1; ++i) {
+ init_queue(queues + i);
+ }
+
+ /*
+ * Set input queues.
+ */
+ pieces[0].in = in;
+ for (i = 1; i < n; ++i) {
+ pieces[i].in = &queues[i - 1];
+ }
+
+ /*
+ * Set output queues.
+ */
+ for (i = 0; i < n - 1; ++i) {
+ pieces[i].out = &queues[i];
+ }
+ pieces[n - 1].out = out;
+
+ /*
+ * Initialise a thread for each stage of the pipeline.
+ */
+ for (i = 0; i < n; ++i) {
+ queue_worker_void_info *qwvi = pieces + i;
+ qwvi->wfn = wfns[i];
+ qwvi->arg = args[i];
+
+ pthread_create(&tids[i], NULL, queue_worker_void, qwvi);
+ }
+
+ /*
+ * Wait for all stages t complete.
+ */
+ for (i = 0; i < n; ++i) {
+ pthread_join(tids[i], &result);
+ }
+
+ /*
+ * Waiting on all previous stages is required to ensure freeing resources is
+ * safe.
+ */
+ free(pieces);
+ free(queues);
+
+ /*
+ * Forward any issue from individual stages.
+ */
+ return result;
+}
+
+void *install_pipeline(worker **wfns, int n, queue *in, queue *out,
+ void **args) {
+
+ int i;
+ handle_worker_info *hwis = malloc(sizeof *hwis * n);
+ handle_worker_info **hwips = malloc(sizeof *hwips * n);
+ queue_worker **hwfns = malloc(sizeof *hwfns * n);
+ void *result;
+
+ for (i = 0; i < n; ++i) {
+ hwfns[i] = handle_worker;
+ hwis[i].wfn = wfns[i];
+ hwis[i].arg = args[i];
+ hwips[i] = &hwis[i];
+ }
+
+ result = install_queue_pipeline(hwfns, n, in, out, (void **) hwips);
+
+ free(hwis);
+ free(hwips);
+ free(hwfns);
+
+ return result;
+}
+
+typedef struct {
+ queue_worker **wfns;
+ int n;
+ void **args;
+} install_pipeline_worker_info;
+
+void *install_pipeline_worker(queue *in, queue *out, void *v) {
+ install_pipeline_worker_info *ipwi = v;
+ return install_queue_pipeline(ipwi->wfns, ipwi->n, in, out, ipwi->args);
+}
+
+void *create_queue_pipeline(queue_emitter *efn, queue_worker **wfns, int n,
+ queue_collector *cfn, void *efn_arg, void **wfn_args, void *cfn_arg) {
+
+ install_pipeline_worker_info ipwi;
+ ipwi.wfns = wfns;
+ ipwi.n = n;
+ ipwi.args = wfn_args;
+
+ return create_queue_base(efn, install_pipeline_worker, cfn, efn_arg, &ipwi,
+ cfn_arg);
+}
+
+void *create_pipeline(emitter *efn, worker **wfns, int n, collector *cfn,
+ void *efn_arg, void **wfn_args, void *cfn_arg) {
+
+ handle_emitter_info hei;
+ handle_worker_info *hwis = malloc(sizeof *hwis * n);
+ handle_collector_info hci;
+
+ queue_worker **hwfns = malloc(sizeof *hwfns * n);
+ handle_worker_info **hwips = malloc(sizeof *hwips * n);
+
+ void *result;
+
+ int i;
+ for (i = 0; i < n; ++i) {
+ hwfns[i] = handle_worker;
+ hwis[i].wfn = wfns[i];
+ hwips[i] = &hwis[i];
+
+ if (wfn_args) {
+ hwis[i].arg = wfn_args[i];
+ } else {
+ hwis[i].arg = NULL;
+ }
+ }
+
+ hei.efn = efn;
+ hei.arg = efn_arg;
+
+ hci.cfn = cfn;
+ hci.arg = cfn_arg;
+
+ result = create_queue_pipeline(handle_emitter, hwfns, n, handle_collector,
+ &hei, (void **) hwips, &hci);
+
+ free(hwis);
+ free(hwips);
+ free(hwfns);
+
+ return result;
+}
diff --git a/parapat.h b/parapat.h
new file mode 100644
index 0000000..cdf6340
--- /dev/null
+++ b/parapat.h
@@ -0,0 +1,35 @@
+/*
+ * vim: ft=c
+ */
+
+#ifndef PARPAT_H
+#define PARPAT_H
+
+#include "queue.h"
+
+typedef int (emitter)(void **, void *);
+typedef void *(worker)(void *, void *);
+typedef void *(collector)(void *, void *, void *);
+
+typedef void *(queue_emitter)(queue *, void *);
+typedef void *(queue_worker)(queue *, queue *, void *);
+typedef void *(queue_collector)(queue *, void *);
+
+void *create_farm(emitter *efn, worker *wfn, collector *cfn, int n,
+ void *efn_arg, void *wfn_arg, void *cfn_arg);
+void *create_pipeline(emitter *efn, worker **wfns, int n, collector *cfn,
+ void *efn_arg, void **wfn_args, void *cfn_arg);
+
+void *install_farm(worker *wfn, int n, queue *in, queue *out, void *arg);
+void *install_pipeline(worker **wfns, int n, queue *in, queue *out,
+ void **args);
+
+void *install_farm_queue(queue_worker *wfn, int n, queue *in, queue *out,
+ void *arg);
+void *install_queue_pipeline(queue_worker **wfns, int n, queue *in,
+ queue *out, void **args);
+
+void *create_queue_base(queue_emitter *efn, queue_worker *wfn,
+ queue_collector *cfn, void *efn_arg, void *wfn_arg, void *cfn_arg);
+
+#endif /* PARPAT_H */
diff --git a/queue.c b/queue.c
new file mode 100644
index 0000000..18530f3
--- /dev/null
+++ b/queue.c
@@ -0,0 +1,63 @@
+#include <semaphore.h>
+#include <pthread.h>
+#include <assert.h>
+
+#include "debug.h"
+
+#include "queue.h"
+
+int init_queue(queue *q) {
+ int i;
+ sem_init(&q->size, 0, 0);
+ pthread_mutex_init(&q->mtx, NULL);
+ q->front = 0;
+ q->back = 0;
+ for (i = 0; i < 100; ++i) {
+ q->elements[i] = NULL;
+ }
+ debug(("initialised queue %p\n", (void *) q));
+
+ return 0;
+}
+
+int enqueue(queue *q, void *v) {
+ pthread_mutex_lock(&q->mtx);
+
+ q->elements[q->front] = v;
+ q->front = (q->front + 1) % 100;
+
+ debug(("%p enqueued in %p\n", v, (void *) q));
+
+ sem_post(&q->size);
+ pthread_mutex_unlock(&q->mtx);
+ return 0;
+}
+
+int dequeue(queue *q, void **v) {
+ int result;
+
+ sem_wait(&q->size);
+ pthread_mutex_lock(&q->mtx);
+
+ if (q->back == q->front) {
+ sem_post(&q->size);
+ result = 1;
+
+ debug(("found %p to be closed\n", (void *) q));
+ } else {
+ *v = q->elements[q->back];
+ q->back = (q->back + 1) % 100;
+ result = 0;
+
+ debug(("%p dequeued from %p\n", *v, (void *) q));
+ }
+
+ pthread_mutex_unlock(&q->mtx);
+ return result;
+}
+
+int pq_close(queue *q) {
+ sem_post(&q->size);
+ debug(("%p closed\n", (void *) q));
+ return 0;
+}
diff --git a/queue.h b/queue.h
new file mode 100644
index 0000000..4ca9b4a
--- /dev/null
+++ b/queue.h
@@ -0,0 +1,25 @@
+/*
+ * vim: ft=c
+ */
+
+#ifndef QUEUE_H
+#define QUEUE_H
+
+#include <semaphore.h>
+#include <pthread.h>
+
+typedef struct {
+ pthread_mutex_t mtx;
+ sem_t size;
+ void *elements[100];
+ int front;
+ int back;
+} queue;
+
+int init_queue(queue *q);
+
+int enqueue(queue *q, void *v);
+int dequeue(queue *q, void **v);
+int pq_close(queue *q);
+
+#endif /* QUEUE_H */
diff --git a/test/Makefile b/test/Makefile
new file mode 100644
index 0000000..6605f50
--- /dev/null
+++ b/test/Makefile
@@ -0,0 +1,23 @@
+CFLAGS = -Werror -Wall -Wextra -Wpedantic -std=c89 -ggdb3 -O1 -L.. -I..
+LDLIBS = -lpthread -lparapat
+OUTPUT_OPTION := -MMD -MP $(OUTPUT_OPTION)
+TRGT = farm-tests pipeline-tests fib
+
+LD_LIBRARY_PATH := $(LD_LIBRARY_PATH):$(PWD)/..
+
+.PHONY: default test clean
+
+default: test
+
+test: test.time
+
+%-tests: ../libparapat.so
+
+test.time: $(TRGT)
+ for x in $(TRGT) ; do ./$$x ; done
+ touch test.time
+
+clean:
+ $(RM) *.d *.o $(TRGT)
+
+-include $(wildcard *.d)
diff --git a/test/farm-tests.c b/test/farm-tests.c
new file mode 100644
index 0000000..470aa7d
--- /dev/null
+++ b/test/farm-tests.c
@@ -0,0 +1,89 @@
+#include <stdlib.h>
+#include <stdio.h>
+
+#include "minunit.h"
+
+#include "parapat.h"
+
+#define unused(x) (x = x)
+
+int tests_run = 0;
+
+int x = 1;
+
+int test_emitter(void **v, void *_) {
+ int *p = malloc(sizeof *p);
+ unused(_);
+ *p = x++;
+ *v = p;
+ return x > 10;
+}
+
+void *test_worker(void *v, void *_) {
+ int *p = v;
+ unused(_);
+ *p *= 2;
+ return v;
+}
+
+void *test_collector(void *l, void *r, void *_) {
+ int *p = l;
+ int *q = r;
+ unused(_);
+ if (!p) {
+ p = malloc(sizeof *p);
+ *p = 0;
+ }
+ *p += *q;
+ /* free(q); */
+ return p;
+}
+
+char *farm_simple() {
+ int a, *res;
+ x = 1;
+ a = 0;
+ res = create_farm(test_emitter, test_worker, test_collector, 5, NULL, NULL,
+ &a);
+
+ mu_assert("error, *res != 110", *res == 110);
+ return 0;
+}
+
+/* char *farm_one() { */
+/* int a, *res; */
+/* x = 1; */
+/* a = 0; */
+/* res = create_farm(test_emitter, test_worker, test_collector, 1, NULL, NULL, */
+/* &a); */
+
+/* mu_assert("error, *res != 110", *res == 110); */
+/* return 0; */
+/* } */
+
+/* char *farm_zero() { */
+/* void *res = create_farm(test_emitter, test_worker, test_collector, 0, NULL, */
+/* NULL, NULL); */
+
+/* mu_assert("error, res != NULL", res == NULL); */
+/* return 0; */
+/* } */
+
+static char *all_tests() {
+ mu_run_test(farm_simple);
+ /* mu_run_test(farm_one); */
+ /* mu_run_test(farm_zero); */
+ return 0;
+}
+
+int main() {
+ char *error = all_tests();
+ if (error) {
+ printf("%s\n", error);
+ } else {
+ printf("ALL TESTS PASSED\n");
+ }
+ printf("Tests run: %d\n", tests_run);
+
+ return error != 0;
+}
diff --git a/test/fib.c b/test/fib.c
new file mode 100644
index 0000000..65bae54
--- /dev/null
+++ b/test/fib.c
@@ -0,0 +1,59 @@
+#include <stdio.h>
+#include <stdlib.h>
+
+#include "parapat.h"
+
+#define unused(x) (x = x)
+
+int fib(int n) {
+ int i, Fnew, Fold, temp,ans;
+
+ Fnew = 1; Fold = 0;
+ for (i = 2; i <= n; i++) {
+ temp = Fnew;
+ Fnew = Fnew + Fold;
+ Fold = temp;
+ }
+ ans = Fnew;
+ return ans;
+}
+
+int payload1(int i) {
+ unused(i);
+ return fib(900090000);
+}
+
+int payload2(int j) {
+ unused(j);
+ return fib(900090000);
+}
+
+int emit(void **out, void *v) {
+ int *q;
+ int *p = v;
+ *out = malloc(sizeof (int));
+ q = *out;
+ *q = (*p)++;
+ return *p >= 100;
+}
+
+void *fibw(void *v, void *a) {
+ int *p = v;
+ int x = payload1(*p);
+ unused(a);
+ *p = payload2(x);
+ return v;
+}
+
+void *printer(void *l, void *r, void *a) {
+ unused(l);
+ unused(r);
+ unused(a);
+ return NULL;
+}
+
+int main() {
+ int i = 0;
+ create_farm(emit, fibw, printer, 100, &i, NULL, NULL);
+ return 0;
+}
diff --git a/test/minunit.h b/test/minunit.h
new file mode 100644
index 0000000..a199bf2
--- /dev/null
+++ b/test/minunit.h
@@ -0,0 +1,17 @@
+/*
+ * MinUnit comes from a tech-note by Jera Design:
+ * http://www.jera.com/techinfo/jtns/jtn002.html
+ */
+
+#define mu_assert(message, test) do { \
+ if (!(test)) return message; \
+} while (0)
+
+#define mu_run_test(test) do { \
+ char *message; \
+ message = test(); \
+ tests_run++; \
+ if (message) return message; \
+} while (0)
+
+extern int tests_run;
diff --git a/test/pipeline-tests.c b/test/pipeline-tests.c
new file mode 100644
index 0000000..2d446a5
--- /dev/null
+++ b/test/pipeline-tests.c
@@ -0,0 +1,83 @@
+#include <stdlib.h>
+#include <stdio.h>
+
+#include "minunit.h"
+
+#include "parapat.h"
+
+#define unused(x) (x = x)
+
+int tests_run = 0;
+
+int x = 1;
+
+int test_emitter(void **v, void *_) {
+ int *p = malloc(sizeof *p);
+ unused(_);
+ *p = x++;
+ *v = p;
+ return x > 3;
+}
+
+void *test_worker(void *v, void *_) {
+ int *p = v;
+ unused(_);
+ *p *= 2;
+ return v;
+}
+
+void *test_collector(void *l, void *r, void *_) {
+ int *p = l;
+ int *q = r;
+ unused(_);
+ if (!p) {
+ p = malloc(sizeof *p);
+ *p = 0;
+ }
+ *p += *q;
+ free(q);
+ return p;
+}
+
+char *pipeline_simple() {
+ int a, *res;
+ worker *wfns[] = { test_worker, test_worker, test_worker, test_worker };
+ x = 1;
+ a = 0;
+ res = create_pipeline(test_emitter, wfns, 4, test_collector, NULL, NULL,
+ &a);
+
+ mu_assert("error, *res != 96", *res == 96);
+ return 0;
+}
+
+char *pipeline_one() {
+ int a, *res;
+ worker *wfns[] = { test_worker };
+ x = 1;
+ a = 0;
+ res = create_pipeline(test_emitter, wfns, 1, test_collector, NULL, NULL,
+ &a);
+
+ printf("answer is %d\n", *res);
+ mu_assert("error, *res != 12", *res == 12);
+ return 0;
+}
+
+static char *all_tests() {
+ mu_run_test(pipeline_simple);
+ mu_run_test(pipeline_one);
+ return 0;
+}
+
+int main() {
+ char *error = all_tests();
+ if (error) {
+ printf("%s\n", error);
+ } else {
+ printf("ALL TESTS PASSED\n");
+ }
+ printf("Tests run: %d\n", tests_run);
+
+ return error != 0;
+}
diff --git a/test/test.c b/test/test.c
new file mode 100644
index 0000000..8f51f62
--- /dev/null
+++ b/test/test.c
@@ -0,0 +1,55 @@
+#include <stdio.h>
+
+#include "parapat.h"
+
+typedef struct {
+ int v;
+} arg_t;
+
+void *test_work(void *v) {
+ arg_t *a = v;
+ a->v *= 2;
+ return a;
+}
+
+void *test_fold(void *a, void *b) {
+ arg_t *x = a, *y = b;
+
+ x->v += y->v;
+
+ return x;
+}
+
+int main() {
+ int i;
+ arg_t a, *res = &a;
+
+ arg_t args[10];
+ arg_t *ptrs[10];
+
+ work *fns[4] = {
+ &test_work,
+ &test_work,
+ &test_work,
+ &test_work
+ };
+
+ pipeline *pl;
+
+ for (i = 0; i < 10; ++i) {
+ args[i].v = i;
+ ptrs[i] = &args[i];
+ }
+
+ a.v = 0;
+
+ res = create_farm(&test_work, &test_fold, 10, (void **) &ptrs[0]);
+ printf("after farm: %d\n", res->v);
+
+ pl = create_pipeline(fns, 4);
+ push(pl->in, res);
+ res = pop(pl->out);
+ printf("after pipeline: %d\n", res->v);
+
+ return 0;
+}