/* * Copyright (c) Likewise Software. All rights Reserved. * * This library is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation; either version 2.1 of the license, or (at * your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser * General Public License for more details. You should have received a copy * of the GNU Lesser General Public License along with this program. If * not, see . * * LIKEWISE SOFTWARE MAKES THIS SOFTWARE AVAILABLE UNDER OTHER LICENSING * TERMS AS WELL. IF YOU HAVE ENTERED INTO A SEPARATE LICENSE AGREEMENT * WITH LIKEWISE SOFTWARE, THEN YOU MAY ELECT TO USE THE SOFTWARE UNDER THE * TERMS OF THAT SOFTWARE LICENSE AGREEMENT INSTEAD OF THE TERMS OF THE GNU * LESSER GENERAL PUBLIC LICENSE, NOTWITHSTANDING THE ABOVE NOTICE. IF YOU * HAVE QUESTIONS, OR WISHTO REQUEST A COPY OF THE ALTERNATE LICENSING * TERMS OFFERED BY LIKEWISE SOFTWARE, PLEASE CONTACT LIKEWISE SOFTWARE AT * license@likewisesoftware.com */ /* * Module Name: * * threadpool-internal.h * * Abstract: * * Thread pool API * * Authors: Brian Koropoff (bkoropoff@likewise.com) * */ #ifndef __LWBASE_THREADPOOL_INTERNAL_H__ #define __LWBASE_THREADPOOL_INTERNAL_H__ #include #include #include #include #include #define TASK_COMPLETE_MASK 0xFFFFFFFF typedef struct _RING { struct _RING* pPrev; struct _RING* pNext; } RING, *PRING; typedef struct _EPOLL_THREAD { PLW_THREAD_POOL pPool; pthread_t Thread; pthread_mutex_t Lock; pthread_cond_t Event; int SignalFds[2]; int EpollFd; RING Tasks; /* Thread load (protected by thread pool lock) */ ULONG volatile ulLoad; BOOLEAN volatile bSignalled; BOOLEAN volatile bShutdown; } EPOLL_THREAD, *PEPOLL_THREAD; typedef struct _WORK_ITEM_THREAD { PLW_THREAD_POOL pPool; pthread_t Thread; } WORK_ITEM_THREAD, *PWORK_ITEM_THREAD; typedef struct _LW_TASK { /* Owning thread */ PEPOLL_THREAD pThread; /* Owning group */ PLW_TASK_GROUP pGroup; /* Ref count (protected by thread lock) */ ULONG volatile ulRefCount; /* Events task is waiting for (owned by thread) */ LW_TASK_EVENT_MASK EventWait; /* Last set of events task waited for (owned by thread) */ LW_TASK_EVENT_MASK EventLastWait; /* Events that will be passed on the next wakeup (owned by thread) */ LW_TASK_EVENT_MASK EventArgs; /* Event conditions signalled between owning thread and external callers (protected by thread lock) */ LW_TASK_EVENT_MASK volatile EventSignal; /* Absolute time of next time wake event (owned by thread) */ LONG64 llDeadline; /* Callback function and context (immutable) */ LW_TASK_FUNCTION pfnFunc; PVOID pFuncContext; /* File descriptor for fd-based events (owned by thread) */ int Fd; /* Link to siblings in task group (protected by group lock) */ RING GroupRing; /* Link to siblings in scheduler queue (owned by thread) */ RING QueueRing; /* Link to siblings in signal queue (protected by thread lock) */ RING SignalRing; } EPOLL_TASK, *PEPOLL_TASK; typedef struct _WORK_ITEM { LW_WORK_ITEM_FUNCTION pfnFunc; PVOID pContext; RING Ring; } WORK_ITEM, *PWORK_ITEM; typedef struct _LW_TASK_GROUP { PLW_THREAD_POOL pPool; RING Tasks; pthread_mutex_t Lock; pthread_cond_t Event; } EPOLL_TASK_GROUP, *PEPOLL_TASK_GROUP; typedef struct _LW_THREAD_POOL { ULONG ulRefCount; PEPOLL_THREAD pEventThreads; ULONG ulEventThreadCount; PWORK_ITEM_THREAD pWorkThreads; ULONG ulWorkThreadCount; RING WorkItems; BOOLEAN volatile bShutdown; pthread_mutex_t Lock; pthread_cond_t Event; } EPOLL_POOL, *PEPOLL_POOL; typedef struct _CLOCK { LONG64 llLastTime; LONG64 llAdjust; } CLOCK, *PCLOCK; /* * Lock order discipline: * * Always lock manager before locking a thread * Always lock a group before locking a thread * Always lock threads at a lower index first */ #define LOCK_THREAD(st) (pthread_mutex_lock(&(st)->Lock)) #define UNLOCK_THREAD(st) (pthread_mutex_unlock(&(st)->Lock)) #define LOCK_GROUP(g) (pthread_mutex_lock(&(g)->Lock)) #define UNLOCK_GROUP(g) (pthread_mutex_unlock(&(g)->Lock)) #define LOCK_POOL(m) (pthread_mutex_lock(&(m)->Lock)) #define UNLOCK_POOL(m) (pthread_mutex_unlock(&(m)->Lock)) /* Ring functions */ static inline VOID RingInit( PRING pRing ) { pRing->pPrev = pRing->pNext = pRing; } static inline VOID RingInsertAfter( PRING pAnchor, PRING pElement ) { pElement->pNext = pAnchor->pNext; pElement->pPrev = pAnchor; pAnchor->pNext->pPrev = pElement; pAnchor->pNext = pElement; } static inline VOID RingInsertBefore( PRING pAnchor, PRING pElement ) { pElement->pNext = pAnchor; pElement->pPrev = pAnchor->pPrev; pAnchor->pPrev->pNext = pElement; pAnchor->pPrev = pElement; } static inline VOID RingRemove( PRING pElement ) { pElement->pPrev->pNext = pElement->pNext; pElement->pNext->pPrev = pElement->pPrev; RingInit(pElement); } static inline VOID RingEnqueue( PRING pAnchor, PRING pElement ) { RingInsertBefore(pAnchor, pElement); } static inline VOID RingDequeue( PRING pAnchor, PRING* pElement ) { *pElement = pAnchor->pNext; RingRemove(*pElement); } static inline VOID RingMove( PRING pFrom, PRING pTo ) { PRING pFromFirst = pFrom->pNext; PRING pFromLast = pFrom->pPrev; PRING pToLast = pTo->pPrev; if (pFrom->pNext != pFrom) { pToLast->pNext = pFromFirst; pFromFirst->pPrev = pToLast; pFromLast->pNext = pTo; pTo->pPrev = pFromLast; pFrom->pNext = pFrom->pPrev = pFrom; } } static inline size_t RingCount( PRING ring ) { PRING iter = NULL; size_t count = 0; for (iter = ring->pNext; iter != ring; iter = iter->pNext, count++); return count; } static inline BOOLEAN RingIsEmpty( PRING ring ) { return ring->pNext == ring; } /* Time functions */ static inline NTSTATUS TimeNow( PLONG64 pllNow ) { struct timeval tv; if (gettimeofday(&tv, NULL)) { return LwErrnoToNtStatus(errno); } else { *pllNow = tv.tv_sec * 1000000000ll + tv.tv_usec * 1000ll; return STATUS_SUCCESS; } } static inline NTSTATUS ClockUpdate( PCLOCK pClock ) { NTSTATUS status = STATUS_SUCCESS; LONG64 llNow = 0; status = TimeNow(&llNow); GOTO_ERROR_ON_STATUS(status); if (pClock->llLastTime == 0) { pClock->llAdjust = -llNow; } else if (llNow <= pClock->llLastTime) { pClock->llAdjust += (pClock->llLastTime - llNow + 1); } pClock->llLastTime = llNow; error: return status; } static inline NTSTATUS ClockGetMonotonicTime( PCLOCK pClock, PLONG64 pllTime ) { NTSTATUS status = STATUS_SUCCESS; status = ClockUpdate(pClock); GOTO_ERROR_ON_STATUS(status); *pllTime = pClock->llLastTime + pClock->llAdjust; error: return status; } #endif