/* More include madness. This is such good stuff, that I can't resist */ /* just "#include"ing it into all the mpmy_??io.c files...*/ /* Try to use different tags for the different kinds of broadcasts... */ #define BCAST_CLOSE 0x3579 #define BCAST_OPEN 0x3779 #define BCAST_READ1 0x3979 #define BCAST_READ2 0x3b79 #define BCAST_WRITE 0x3d79 #define BCAST_LSEEK 0x3f79 #define BCAST_TELL 0x4179 #define BCAST_FLEN 0x4379 #define BCAST_MKDIR 0x4579 static int open0(const char *path, int flags, int mode) { int ret; /* paragon left the const out of the prototype */ Msgf(("open0(path=%s, flags=%#x, mode=%#x\n", path, flags, mode)); if (MPMY_Procnum() == 0){ ret = open((char *)path, flags, mode); Msgf(("open returns %d on node 0\n", ret)); } MPMY_BcastTag(&ret, 1, MPMY_INT, 0, BCAST_OPEN); Msgf(("open0 returning %d\n", ret)); return ret; } static int close0(int fd) { int ret; if (MPMY_Procnum() == 0) ret = close(fd); MPMY_BcastTag(&ret, 1, MPMY_INT, 0, BCAST_CLOSE); return ret; } static long read0(int fd, void *buf, unsigned long nbytes) { long ret; if (MPMY_Procnum() == 0){ ret = read(fd, buf, nbytes); Msgf(("read0: read(fd=%d, nbytes=%ld) returns %ld\n", fd, nbytes, ret)); } MPMY_BcastTag(&ret, 1, MPMY_LONG, 0, BCAST_READ1); Msgf(("read0: after Bcast ret = %ld\n", ret)); if( ret > 0 ) MPMY_BcastTag(buf, ret, MPMY_CHAR, 0, BCAST_READ2); if( Msg_test(__FILE__)){ int i; int sum = 0; char *cbuf = buf; for(i=0; i 0) { off_t nwrite = (nbytes > MAX_IOBUF) ? MAX_IOBUF : nbytes; off_t wrote; wrote = write(fd, buf+ret, nwrite); if (wrote != nwrite) Shout("write failed, errno=%d\n", errno); ret += wrote; nbytes -= wrote; } Msgf(("write0 %ld\n", ret)); if (nproc > 1) { tmpbuf = Malloc(nbytes); for (i = procnum+1; i < nproc; i++) { /* This could be double-buffered */ tmpbuf = Realloc(tmpbuf, sizes[i]); /* Avoid deluge of messages by sync */ MPMY_send(&sync, sizeof(int), i, MPMY_IOTAG); MPMY_recvn(tmpbuf, sizes[i], i, MPMY_IOTAG); n = write(fd, tmpbuf, sizes[i]); if (n != sizes[i]) Shout("write failed, errno=%d\n", errno); Msgf(("write%d %d\n", i, n)); /* should we send errno as well? */ MPMY_send(&n, sizeof(int), i, MPMY_IOTAG); } Free(tmpbuf); } Free(sizes); } else { MPMY_Gather(&nbytes, 1, MPMY_OFFT, NULL, 0); MPMY_recvn(&sync, sizeof(int), 0, MPMY_IOTAG); Msgf(("sending %ld bytes\n", nbytes)); MPMY_send(buf, nbytes, 0, MPMY_IOTAG); MPMY_recvn(&ret, sizeof(int), 0, MPMY_IOTAG); } return ret; } static long read0_multi(int fd, void *buf, off_t nbytes) { int ret; off_t *sizes; int i, n; char *tmpbuf; int nproc = MPMY_Nproc(); int procnum = MPMY_Procnum(); if (sizeof(off_t) != sizeof(long long)) { Error("Type problem in write0_multi\n"); } if (procnum == 0) { sizes = Malloc(sizeof(off_t)*nproc); MPMY_Gather(&nbytes, 1, MPMY_OFFT, sizes, 0); ret = read(fd, buf, nbytes); if (ret != nbytes) Shout("read failed, errno=%d\n", errno); Msgf(("read0 %d\n", ret)); if (nproc > 1) { tmpbuf = Malloc(nbytes); for (i = procnum+1; i < nproc; i++) { /* This could be double-buffered */ tmpbuf = Realloc(tmpbuf, sizes[i]); n = read(fd, tmpbuf, sizes[i]); MPMY_send(tmpbuf, n, i, MPMY_IOTAG); if (n != sizes[i]) Shout("read failed, errno=%d\n", errno); Msgf(("read%d %d\n", i, n)); } Free(tmpbuf); } Free(sizes); } else { MPMY_Status stat; MPMY_Comm_request req; MPMY_Gather(&nbytes, 1, MPMY_OFFT, NULL, 0); MPMY_Irecv(buf, nbytes, 0, MPMY_IOTAG, &req); MPMY_Wait(req, &stat); ret = MPMY_Count(&stat); } return ret; }