diff --git a/qemud/event.c b/qemud/event.c --- a/qemud/event.c +++ b/qemud/event.c @@ -28,12 +28,16 @@ #include #include #include +#include #include "qemud.h" #include "event.h" #include "memory.h" +#include "util.h" #define EVENT_DEBUG(fmt, ...) qemudDebug("EVENT: " fmt, __VA_ARGS__) + +static int virEventInterruptLocked(void); /* State for a single file handle being monitored */ struct virEventHandle { @@ -63,6 +67,9 @@ struct virEventTimeout { /* State for the main event loop */ struct virEventLoop { + pthread_mutex_t lock; + pthread_t leader; + int wakeupfd[2]; int handlesCount; int handlesAlloc; struct virEventHandle *handles; @@ -80,6 +87,16 @@ static int nextWatch = 0; /* Unique ID for the next timer to be registered */ static int nextTimer = 0; +static void virEventLock(void) +{ + pthread_mutex_lock(&eventLoop.lock); +} + +static void virEventUnlock(void) +{ + pthread_mutex_unlock(&eventLoop.lock); +} + /* * Register a callback for monitoring file handle events. * NB, it *must* be safe to call this from within a callback @@ -89,17 +106,23 @@ int virEventAddHandleImpl(int fd, int ev virEventHandleCallback cb, void *opaque, virFreeCallback ff) { + int watch; EVENT_DEBUG("Add handle %d %d %p %p", fd, events, cb, opaque); + virEventLock(); if (eventLoop.handlesCount == eventLoop.handlesAlloc) { EVENT_DEBUG("Used %d handle slots, adding %d more", eventLoop.handlesAlloc, EVENT_ALLOC_EXTENT); if (VIR_REALLOC_N(eventLoop.handles, - (eventLoop.handlesAlloc + EVENT_ALLOC_EXTENT)) < 0) + (eventLoop.handlesAlloc + EVENT_ALLOC_EXTENT)) < 0) { + virEventUnlock(); return -1; + } eventLoop.handlesAlloc += EVENT_ALLOC_EXTENT; } - eventLoop.handles[eventLoop.handlesCount].watch = nextWatch++; + watch = nextWatch++; + + eventLoop.handles[eventLoop.handlesCount].watch = watch; eventLoop.handles[eventLoop.handlesCount].fd = fd; eventLoop.handles[eventLoop.handlesCount].events = virEventHandleTypeToPollEvent(events); @@ -110,11 +133,15 @@ int virEventAddHandleImpl(int fd, int ev eventLoop.handlesCount++; - return nextWatch-1; + virEventInterruptLocked(); + virEventUnlock(); + + return watch; } void virEventUpdateHandleImpl(int watch, int events) { int i; + virEventLock(); for (i = 0 ; i < eventLoop.handlesCount ; i++) { if (eventLoop.handles[i].watch == watch) { eventLoop.handles[i].events = @@ -122,6 +149,8 @@ void virEventUpdateHandleImpl(int watch, break; } } + virEventInterruptLocked(); + virEventUnlock(); } /* @@ -133,6 +162,7 @@ int virEventRemoveHandleImpl(int watch) int virEventRemoveHandleImpl(int watch) { int i; EVENT_DEBUG("Remove handle %d", watch); + virEventLock(); for (i = 0 ; i < eventLoop.handlesCount ; i++) { if (eventLoop.handles[i].deleted) continue; @@ -140,9 +170,12 @@ int virEventRemoveHandleImpl(int watch) if (eventLoop.handles[i].watch == watch) { EVENT_DEBUG("mark delete %d %d", i, eventLoop.handles[i].fd); eventLoop.handles[i].deleted = 1; + virEventUnlock(); return 0; } } + virEventInterruptLocked(); + virEventUnlock(); return -1; } @@ -157,17 +190,21 @@ int virEventAddTimeoutImpl(int frequency void *opaque, virFreeCallback ff) { struct timeval now; + int ret; EVENT_DEBUG("Adding timer %d with %d ms freq", nextTimer, frequency); if (gettimeofday(&now, NULL) < 0) { return -1; } + virEventLock(); if (eventLoop.timeoutsCount == eventLoop.timeoutsAlloc) { EVENT_DEBUG("Used %d timeout slots, adding %d more", eventLoop.timeoutsAlloc, EVENT_ALLOC_EXTENT); if (VIR_REALLOC_N(eventLoop.timeouts, - (eventLoop.timeoutsAlloc + EVENT_ALLOC_EXTENT)) < 0) + (eventLoop.timeoutsAlloc + EVENT_ALLOC_EXTENT)) < 0) { + virEventUnlock(); return -1; + } eventLoop.timeoutsAlloc += EVENT_ALLOC_EXTENT; } @@ -183,8 +220,10 @@ int virEventAddTimeoutImpl(int frequency (((unsigned long long)now.tv_usec)/1000) : 0; eventLoop.timeoutsCount++; - - return nextTimer-1; + ret = nextTimer-1; + virEventInterruptLocked(); + virEventUnlock(); + return ret; } void virEventUpdateTimeoutImpl(int timer, int frequency) { @@ -195,6 +234,7 @@ void virEventUpdateTimeoutImpl(int timer return; } + virEventLock(); for (i = 0 ; i < eventLoop.timeoutsCount ; i++) { if (eventLoop.timeouts[i].timer == timer) { eventLoop.timeouts[i].frequency = frequency; @@ -205,6 +245,8 @@ void virEventUpdateTimeoutImpl(int timer break; } } + virEventInterruptLocked(); + virEventUnlock(); } /* @@ -216,15 +258,19 @@ int virEventRemoveTimeoutImpl(int timer) int virEventRemoveTimeoutImpl(int timer) { int i; EVENT_DEBUG("Remove timer %d", timer); + virEventLock(); for (i = 0 ; i < eventLoop.timeoutsCount ; i++) { if (eventLoop.timeouts[i].deleted) continue; if (eventLoop.timeouts[i].timer == timer) { eventLoop.timeouts[i].deleted = 1; + virEventUnlock(); return 0; } } + virEventInterruptLocked(); + virEventUnlock(); return -1; } @@ -336,10 +382,15 @@ static int virEventDispatchTimeouts(void continue; if (eventLoop.timeouts[i].expiresAt <= now) { - (eventLoop.timeouts[i].cb)(eventLoop.timeouts[i].timer, - eventLoop.timeouts[i].opaque); + virEventTimeoutCallback cb = eventLoop.timeouts[i].cb; + int timer = eventLoop.timeouts[i].timer; + void *opaque = eventLoop.timeouts[i].opaque; eventLoop.timeouts[i].expiresAt = now + eventLoop.timeouts[i].frequency; + + virEventUnlock(); + (cb)(timer, opaque); + virEventLock(); } } return 0; @@ -356,28 +407,25 @@ static int virEventDispatchTimeouts(void * * Returns 0 upon success, -1 if an error occurred */ -static int virEventDispatchHandles(struct pollfd *fds) { +static int virEventDispatchHandles(int nfds, struct pollfd *fds) { int i; - virEventHandleType hEvents; - /* Save this now - it may be changed during dispatch */ - int nhandles = eventLoop.handlesCount; - for (i = 0 ; i < nhandles ; i++) { + for (i = 0 ; i < nfds ; i++) { if (eventLoop.handles[i].deleted) { EVENT_DEBUG("Skip deleted %d", eventLoop.handles[i].fd); continue; } if (fds[i].revents) { - hEvents = virPollEventToEventHandleType(fds[i].revents); - EVENT_DEBUG("Dispatch %d %d %d %p", - eventLoop.handles[i].watch, - fds[i].fd, fds[i].revents, - eventLoop.handles[i].opaque); - (eventLoop.handles[i].cb)(eventLoop.handles[i].watch, - fds[i].fd, - hEvents, - eventLoop.handles[i].opaque); + virEventHandleCallback cb = eventLoop.handles[i].cb; + void *opaque = eventLoop.handles[i].opaque; + int hEvents = virPollEventToEventHandleType(fds[i].revents); + EVENT_DEBUG("Dispatch %d %d %p", fds[i].fd, + fds[i].revents, eventLoop.handles[i].opaque); + virEventUnlock(); + (cb)(eventLoop.handles[i].watch, + fds[i].fd, hEvents, opaque); + virEventLock(); } } @@ -472,13 +520,20 @@ int virEventRunOnce(void) { struct pollfd *fds; int ret, timeout, nfds; - if ((nfds = virEventMakePollFDs(&fds)) < 0) + virEventLock(); + eventLoop.leader = pthread_self(); + if ((nfds = virEventMakePollFDs(&fds)) < 0) { + virEventUnlock(); return -1; + } if (virEventCalculateTimeout(&timeout) < 0) { VIR_FREE(fds); + virEventUnlock(); return -1; } + + virEventUnlock(); retry: EVENT_DEBUG("Poll on %d handles %p timeout %d", nfds, fds, timeout); @@ -491,25 +546,86 @@ int virEventRunOnce(void) { VIR_FREE(fds); return -1; } + + virEventLock(); if (virEventDispatchTimeouts() < 0) { VIR_FREE(fds); + virEventUnlock(); return -1; } if (ret > 0 && - virEventDispatchHandles(fds) < 0) { + virEventDispatchHandles(nfds, fds) < 0) { VIR_FREE(fds); + virEventUnlock(); return -1; } VIR_FREE(fds); - if (virEventCleanupTimeouts() < 0) + if (virEventCleanupTimeouts() < 0) { + virEventUnlock(); + return -1; + } + + if (virEventCleanupHandles() < 0) { + virEventUnlock(); + return -1; + } + + eventLoop.leader = 0; + virEventUnlock(); + return 0; +} + +static void virEventHandleWakeup(int watch ATTRIBUTE_UNUSED, + int fd, + int events ATTRIBUTE_UNUSED, + void *opaque ATTRIBUTE_UNUSED) +{ + char c; + virEventLock(); + saferead(fd, &c, sizeof(c)); + virEventUnlock(); +} + +int virEventInit(void) +{ + if (pthread_mutex_init(&eventLoop.lock, NULL) != 0) return -1; - if (virEventCleanupHandles() < 0) + if (pipe(eventLoop.wakeupfd) < 0 || + qemudSetNonBlock(eventLoop.wakeupfd[0]) < 0 || + qemudSetNonBlock(eventLoop.wakeupfd[1]) < 0 || + qemudSetCloseExec(eventLoop.wakeupfd[0]) < 0 || + qemudSetCloseExec(eventLoop.wakeupfd[1]) < 0) + return -1; + + if (virEventAddHandleImpl(eventLoop.wakeupfd[0], + VIR_EVENT_HANDLE_READABLE, + virEventHandleWakeup, NULL, NULL) < 0) return -1; return 0; +} + +static int virEventInterruptLocked(void) +{ + char c = '\0'; + if (pthread_self() == eventLoop.leader) + return 0; + + if (safewrite(eventLoop.wakeupfd[1], &c, sizeof(c)) != sizeof(c)) + return -1; + return 0; +} + +int virEventInterrupt(void) +{ + int ret; + virEventLock(); + ret = virEventInterruptLocked(); + virEventUnlock(); + return ret; } int diff --git a/qemud/event.h b/qemud/event.h --- a/qemud/event.h +++ b/qemud/event.h @@ -101,6 +101,13 @@ int virEventRemoveTimeoutImpl(int timer) int virEventRemoveTimeoutImpl(int timer); /** + * virEventInit: Initialize the event loop + * + * returns -1 if initialization failed + */ +int virEventInit(void); + +/** * virEventRunOnce: run a single iteration of the event loop. * * Blocks the caller until at least one file handle has an @@ -116,5 +123,12 @@ virPollEventToEventHandleType(int events virPollEventToEventHandleType(int events); +/** + * virEventInterrupt: wakeup any thread waiting in poll() + * + * return -1 if wakup failed + */ +int virEventInterrupt(void); + #endif /* __VIRTD_EVENT_H__ */ diff --git a/qemud/qemud.c b/qemud/qemud.c --- a/qemud/qemud.c +++ b/qemud/qemud.c @@ -296,7 +296,7 @@ qemudDispatchSignalEvent(int watch ATTRI server->shutdown = 1; } -static int qemudSetCloseExec(int fd) { +int qemudSetCloseExec(int fd) { int flags; if ((flags = fcntl(fd, F_GETFD)) < 0) goto error; @@ -311,7 +311,7 @@ static int qemudSetCloseExec(int fd) { } -static int qemudSetNonBlock(int fd) { +int qemudSetNonBlock(int fd) { int flags; if ((flags = fcntl(fd, F_GETFL)) < 0) goto error; @@ -752,6 +752,12 @@ static struct qemud_server *qemudInitial } server->sigread = sigread; + + if (virEventInit() < 0) { + qemudLog(QEMUD_ERR, "%s", _("Failed to initialize event system")); + VIR_FREE(server); + return NULL; + } virInitialize(); diff --git a/qemud/qemud.h b/qemud/qemud.h --- a/qemud/qemud.h +++ b/qemud/qemud.h @@ -177,6 +177,9 @@ void qemudLog(int priority, const char * #define qemudDebug(fmt, ...) do {} while(0) #endif +int qemudSetCloseExec(int fd); +int qemudSetNonBlock(int fd); + unsigned int remoteDispatchClientRequest (struct qemud_server *server, struct qemud_client *client);