tttm

git clone https://orangeshoelaces.net/git/tttm.git

3ddabf33663da7008be59e2d4562bdfbf93a3eed

Author: Vasily Kolobkov on 05/29/2016

Committer: Vasily Kolobkov on 05/29/2016

Draft message plumbing

- tighten up read/write loops
- fix par_procure to read enough data in before sliding window
- and a few other random fixes

Stats

imap.c   |  27 +-
imap.h   |   2 +-
laxsrc.c |  13 +-
parser.c |  54 ++-
tttm.c   | 164 +++++++-
5 files changed, 218 insertions(+), 42 deletions(-)

Patch

diff --git a/imap.c b/imap.c
index a1e2fc5..5fa99d7 100644
--- a/imap.c
+++ b/imap.c
@@ -1,3 +1,4 @@
+#include <errno.h>
 #include <limits.h>
 #include <stdarg.h>
 #include <stddef.h>
@@ -216,6 +217,8 @@ imap_cmd(struct imapctx *con, int nstate, const char *cmd,
 		    imap_matchresp(con, unilat, b, &e)) && e) {
 			break;
 		}
+		if (cache != -1 && lseek(cache, 0, SEEK_END) == -1)
+			goto eio;
 	}
 	if (e) {
 		goto exit;
@@ -231,6 +234,9 @@ imap_cmd(struct imapctx *con, int nstate, const char *cmd,
  exit:
 	va_end(ap);
 	return e;
+ eio:
+	e = TE_CACHEIO;
+	goto exit;
  eproto:
 	e = TE_PROTO;
 	goto exit;
@@ -490,6 +496,9 @@ dig_fetch(struct imapctx *con, int buf, void *ctx)
 	union parnode *f, *b, *nstr;
 	uint32_t msn;
 	struct msgd *m;
+	char *bcur;
+	size_t blen;
+	ssize_t n;
 
 	e = 0;
 	args = ctx;
@@ -497,7 +506,7 @@ dig_fetch(struct imapctx *con, int buf, void *ctx)
 	msn = (f + 1)->num.val;
 	if (msn < args->lo || msn > args->hi)
 		goto exit;
-	if ((b = par_seld(f, IP_822BODY, -1)))
+	if (!(b = par_seld(f, IP_822BODY, -1)))
 		goto exit;
 	nstr = b + 3;
 	if (islit(nstr, IL_NIL))
@@ -508,9 +517,19 @@ dig_fetch(struct imapctx *con, int buf, void *ctx)
 	if (buf) {
 		if ((m->off = lseek(args->stor, 0, SEEK_END)) == -1)
 			goto eio;
-		if (write(args->stor, con->rep + nstr->str.off,
-		    nstr->str.len) == -1)
-			goto eio;
+		bcur = con->rep + nstr->str.off;
+		blen = nstr->str.len;
+		while (blen) {
+			if ((n = write(args->stor, bcur, blen)) == 0) {
+				goto eio;
+			} else if (n == -1) {
+				if (errno == EINTR || errno == EAGAIN)
+					continue;
+				else goto eio;
+			}
+			blen -= n;
+			bcur += n;
+		}
 	}
 	if (nstr->type != PN_LSTR)
 		goto eproto;
diff --git a/imap.h b/imap.h
index 9aaefb5..d9565f0 100644
--- a/imap.h
+++ b/imap.h
@@ -8,7 +8,7 @@ struct imapctx {
 	int           out;
 
 	unsigned int  tag;
-	char          rep[2048];
+	char          rep[1024];
 	union parnode par[1024];
 
 	int           caps;
diff --git a/laxsrc.c b/laxsrc.c
index c94feb3..d6f57cb 100644
--- a/laxsrc.c
+++ b/laxsrc.c
@@ -1,3 +1,4 @@
+#include <errno.h>
 #include <poll.h>
 #include <stddef.h>
 #include <string.h>
@@ -69,14 +70,14 @@ laxsrc_read(struct laxsrc *s, char *dst, size_t lo, size_t hi, int *e)
 		if (ready == 0) goto eto;
 
 		if (pd.revents & (POLLIN | POLLHUP)) {
-			n = read(pd.fd, dst + len, hi - len);
-			if (n == -1) {
-				goto ein;
-			} else if (n == 0) {
+			if ((n = read(pd.fd, dst + len, hi - len)) == 0) {
 				goto eof;
-			} else {
-				len += n;
+			} else if (n == -1) {
+				if (errno == EINTR || errno == EAGAIN)
+					continue;
+				else goto ein;
 			}
+			len += n;
 		}
 	}
 
diff --git a/parser.c b/parser.c
index ecb0ed8..2e1de9c 100644
--- a/parser.c
+++ b/parser.c
@@ -4,6 +4,7 @@
    or 0 is dispensed to the parse context.
 */
 
+#include <errno.h>
 #include <stdarg.h>
 #include <stdint.h>
 #include <stdlib.h>
@@ -444,24 +445,24 @@ int
 par_procure(struct parctx *p, size_t off, size_t len)
 {
 	int e;
-	size_t tend;
-	int wndmiss, readbound;
+	size_t send, wndshort;
 
 	e = TE_OK;
-	tend = off + len;
+	send = off + len;
 
-	if (off >= p->wndoff &&
-	    off + len <= p->wndoff + p->wndlee)
+	if (off >= p->wndoff && send <= p->wndoff + p->wndlee)
 		goto exit;
 
-	wndmiss = off < p->wndoff || tend > p->wndoff + wndcap;
-	if (wndmiss && (e = par_movewnd(p, off)))
-		goto exit;
-
-	readbound = tend > p->strlen;
-	if (readbound)
-		e = par_readnup(p, tend - p->strlen);
+	while (send > p->strlen) {
+		if ((p->wndlee == wndcap) && (e = par_movewnd(p, p->strlen)))
+			goto exit;
+		wndshort = MIN(send - p->strlen, wndcap - p->wndlee);
+		if ((e = par_readnup(p, wndshort)))
+			goto exit;
+	}
 
+	if (off < p->wndoff || send > p->wndoff + wndcap)
+	    e = par_movewnd(p, off);
  exit:
 	return e;
 }
@@ -554,6 +555,7 @@ par_prycache(struct parctx *p, size_t len)
 {
 	int e;
 	size_t flen;
+	ssize_t n;
 	struct stat cs;
 
 	e = TE_OK;
@@ -564,9 +566,13 @@ par_prycache(struct parctx *p, size_t len)
 	if (flen < MAX(len, p->corig) || flen > INT64_MAX)
 		goto eover;
 
-	if (cs.st_size < flen &&
-	    pwrite(p->cache, "", 1, flen - 1) != 1)
-		goto eio;
+	if (cs.st_size < flen) {
+		while ((n = pwrite(p->cache, "", 1, flen - 1)) <= 0) {
+			if (n == -1 && (errno == EINTR || errno == EAGAIN))
+				continue;
+			else goto eio;
+		}
+	}
 
  exit:
 	return e;
@@ -1394,27 +1400,29 @@ p_lit(struct parctx *p, int val)
 int
 p_lstr(struct parctx *p)
 {
+	int prefix;
 	size_t len;
 	struct parcur b, n;
 
-	p_chk(p, &b) && p_lit(p, IL_OBRACE) &&
+	prefix = p_chk(p, &b) && p_lit(p, IL_OBRACE) &&
 	    p_chk(p, &n) && p_num(p) && p_lit(p, IL_CBRACE) &&
 	    p_lit(p, IL_EOL) || p_rwd(p, &b);
 
-	if (p->e) goto exit;
+	if (!prefix)
+		goto exit;
+
 	len = n.pt->num.val;
+	p->cur.pt = b.pt;
+
 	if (len > UINT32_MAX) goto exl;
 	if (len > 0 && (p->e = par_procure(p, p->cur.off + len - 1, 1)))
-		goto eproc;
-	if (!p_insstr(p, PN_LSTR, p->cur.off, len)) goto exit;
+		goto exit;
+	if (!p_insstr(p, PN_LSTR, p->corig + p->cur.off, len))
+		goto exit;
 
 	p->cur.off += len;
  exit:
 	return p->e == TE_OK;
- eproc:
-	if (p->e == TE_EOF)
-		p->e = TE_PARSE;
-	goto exit;
  exl:
 	p->e = TE_XLSTR;
 	goto exit;
diff --git a/tttm.c b/tttm.c
index fdfcdec..41677ea 100644
--- a/tttm.c
+++ b/tttm.c
@@ -1,6 +1,13 @@
 #include <err.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <stdint.h>
 #include <stdlib.h>
 #include <unistd.h>
+#include <sys/mman.h>
+#include <sys/wait.h>
+#include <sys/stat.h>
 
 #include "errors.h"
 #include "laxsrc.h"
@@ -12,26 +19,37 @@
 /*
    Run fetch, pipe and delete cycle on this many messages a pop.
 */
-#define BATCHSZ 10
+#define BATCHSZ 2
 
 /*
    Retry fetching mail at most this many times after getting a nil fetch.
 */
 #define MAXREFC 2
 
-static int	pipemsg(struct msgd *, int);
+static int	pipemsg(struct msgd *, int, const char *);
+static int	sink_init(const char *, pid_t *, int *);
+static int	sink_free(pid_t, int);
 static void	fin(struct imapctx *, int, const char *);
 static void	terr(int, const char *);
 
+extern char **environ;
+
+static size_t page;
+
 int
 main(int argc, char **argv)
 {
 	int e;
 	struct imapctx con;
-	char *name, *pass;
+	char *name, *pass, *sink;
 	int tmp;
 	int refetchc, batchsz, mpiped;
 	struct msgd bag[BATCHSZ], *m, *bend;
+	long sv;
+
+	if ((sv = sysconf(_SC_PAGESIZE)) == -1)
+		err(1, "failed to find out page size");
+	page = sv;
 
 	if ((e = imap_init(&con, STDIN_FILENO, STDOUT_FILENO))) {
 		terr(e, "failed to initialize session");
@@ -42,6 +60,9 @@ main(int argc, char **argv)
 
 	name = argv[1];
 	pass = argv[2];
+	sink = argv[3];
+	tmp = open("tmp", O_RDWR | O_CREAT, S_IRWXU);
+
 	e = imap_login(&con, name, pass);
 	if (e == TE_NO) {
 		warnx("login credentials rejected");
@@ -77,10 +98,8 @@ main(int argc, char **argv)
 		}
 		refetchc = 0;
 		for (m = bag; m < bend; m++) {
-			if ((e = pipemsg(m, tmp))) {
-				warn("failed to pipe message");
+			if (pipemsg(m, tmp, sink) == -1)
 				break;
-			}
 		}
 		mpiped = m - bag;
 		if (mpiped == 0) {
@@ -108,9 +127,138 @@ main(int argc, char **argv)
 }
 
 int
-pipemsg(struct msgd *m, int stor)
+pipemsg(struct msgd *m, int stor, const char *cmd)
 {
-	return 0;
+	int e, sin;
+	void (*origpipe)(int);
+	size_t scur, send, wlen, wlee;
+	char *wnd, *wcur;
+	pid_t sid;
+	ssize_t n;
+
+	e = -1;
+	if (lseek(stor, m->off, SEEK_SET) == -1) {
+		warn("failed to seek in temporary file");
+		goto exit;
+	}
+	if ((origpipe = signal(SIGPIPE, SIG_IGN)) == SIG_ERR) {
+		warn("failed to install SIGPIPE handler");
+		goto exit;
+	}
+	if (sink_init(cmd, &sid, &sin) == -1)
+		goto csig;
+
+	for (scur = m->off, send = scur + m->len; scur < send; scur += wlen) {
+		wlen = MIN(send - scur, page);
+		wnd = mmap(0, wlen, PROT_READ, MAP_PRIVATE, stor, scur);
+		if (wnd == MAP_FAILED) {
+			warn("failed to map temporary file");
+			goto csink;
+		}
+		wcur = wnd;
+		wlee = wlen;
+		while (wlee) {
+			if ((n = write(sin, wcur, wlee)) == 0) {
+				break;
+			} else if (n == -1) {
+				if (errno == EINTR || errno == EAGAIN)
+					continue;
+				else break;
+			}
+			wcur += n;
+			wlee -= n;
+		}
+		if (munmap(wnd, wlen) == -1) {
+			warn("failed to unmap temporary file");
+			goto csink;
+		}
+		if (n == 0) {
+			warn("sink shut down unexpectedly");
+			goto csink;
+		} else if (n == -1) {
+			warn("failed to write to sink");
+			goto csink;
+		}
+	}
+	e = 0;
+ csink:
+	if (sink_free(sid, sin) == -1)
+		e = -1;
+ csig:
+	if (signal(SIGPIPE, origpipe) == SIG_ERR) {
+		e = -1;
+		warn("failed to restore SIGPIPE handler");
+	}
+ exit:
+	return e;
+}
+
+int
+sink_init(const char *cmd, pid_t *sid, int *sin)
+{
+	int e;
+	int pd[2], r, w;
+	char *args[4];
+
+	e = -1;
+	if (pipe(pd) == -1) {
+		warn("failed to create pipe");
+		goto exit;
+	}
+	r = pd[0];
+	*sin = w = pd[1];
+	*sid = fork();
+	if (*sid == -1) {
+		warn("failed to fork");
+		close(r);
+		close(w);
+		goto exit;
+	} else if (*sid == 0) {
+		if (dup2(r, 0) == -1 || close(r) == -1 || close(w) == -1) {
+			warn("failed to setup sink");
+			_exit(1);
+		}
+		args[0] = "sh";
+		args[1] = "-c";
+		args[2] = (char *)cmd;
+		args[3] = 0;
+		execve("/bin/sh", args, environ);
+		warn("failed to launch sink");
+		_exit(1);
+	}
+	if (close(r) == -1) {
+		warn("failed to let read end of pipe loose");
+		close(w);
+		goto exit;
+	}
+	e = 0;
+ exit:
+	return e;
+}
+
+int
+sink_free(pid_t sid, int in)
+{
+	int e, s;
+
+	e = -1;
+	if (close(in) == -1) {
+		warn("failed to close write end of pipe");
+		if (kill(sid, SIGINT) == -1) {
+			warn("failed to shut down sink");
+			goto exit;
+		}
+	}
+	if (waitpid(sid, &s, 0) == -1) {
+		warn("failed to randezvous with sink");
+	} else if (!WIFEXITED(s)) {
+		warn("sink process terminated");
+	} else if (WEXITSTATUS(s)) {
+		warn("sink process failed (%d)", (int)WEXITSTATUS(s));
+	}
+	e = 0;
+ exit:
+	return e;
 }
 
 void