123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- /*-------------------------------------------------------------------------
- allegdb.cpp
-
- Implementation of OLE DB layer for Allegiance
-
- Owner:
-
- Copyright 1986-2000 Microsoft Corporation, All Rights Reserved
- *-----------------------------------------------------------------------*/
- #include "pch.h"
- #include "allegdb.h"
- unsigned CALLBACK SQLQueueProc(void * pvSQLQueueThread)
- {
- CSQLQueueThread * pSQLQueueThread = (CSQLQueueThread *) pvSQLQueueThread;
- HANDLE rgeventHandles[1];
- rgeventHandles[0] = pSQLQueueThread->GetEventDie();
- DWORD cHandles = sizeof(rgeventHandles) / sizeof(rgeventHandles[0]);
- DWORD dwWait = WAIT_TIMEOUT;
- do
- {
- //Wait until either we get a message or we ran out of time
- dwWait = MsgWaitForMultipleObjects(cHandles, rgeventHandles, FALSE, INFINITE, QS_ALLINPUT);
- // Process the message queue, if any messages were received
- static MSG msg;
- while (WAIT_OBJECT_0 != dwWait && PeekMessage(&msg, NULL, 0, 0, PM_REMOVE))
- {
- // dispatch Windows Messages to allow for the admin tool's COM to work
- TranslateMessage(&msg);
- switch (msg.message)
- {
- case wm_sql_addquery:
- {
- pSQLQueueThread->AddQuery((CSQLQuery*) msg.lParam);
- break;
- }
-
- case WM_QUIT:
- dwWait = WAIT_OBJECT_0; // let the thread be shutdown by sending a quit, or signalling pSqlThread->m_hEventExit
- break;
- default:
- ZError("SQLThreadProc: Unexpected thread message.\n");
- }
- }
- } while (WAIT_OBJECT_0 != dwWait);
-
- return 0;
- }
- unsigned CALLBACK SQLThreadProc(void * pvsqlThread)
- {
- CSQLThread * pSqlThread = (CSQLThread *) pvsqlThread;
- CSQLCore * psql = pSqlThread->GetSQLCore();
- HRESULT hr = E_FAIL;
- hr = pSqlThread->Open();
- if (FAILED(hr))
- return hr;
-
- HANDLE rgeventHandles[2];
- rgeventHandles[0] = pSqlThread->GetEventDie();
- rgeventHandles[1] = pSqlThread->GetNotify() ? psql->GetNotifySemaphore() : psql->GetSilentSemaphore();
- DWORD cHandles = sizeof(rgeventHandles) / sizeof(rgeventHandles[0]);
- DWORD dwWait = WAIT_TIMEOUT;
- do
- {
- //Wait until either we get a message or we ran out of time
- dwWait = WaitForMultipleObjects(cHandles, rgeventHandles, FALSE, INFINITE);
- switch (dwWait)
- {
- case WAIT_OBJECT_0: // pSqlThread->m_hEventExit
- break;
- case WAIT_OBJECT_0 + 1: // spemaphore for the queue we're servicing
- // We may not actually get the query that we were signaled for, but that doesn't matter
- hr = pSqlThread->ServiceQuery(pSqlThread->GetNotify() ? psql->GetNotifyQuery() : psql->GetSilentQuery());
- break;
- default:
- ZError("Unexpected object signaled in SQLThreadProc.\n");
- }
- } while (WAIT_OBJECT_0 != dwWait);
- return 0;
- }
- HRESULT CSQLCore::Init(LPCOLESTR strSQLConfig, DWORD nThreadIDNotify, DWORD cSilentThreads,
- DWORD cNotifyThreads)
- {
- HRESULT hr = E_FAIL;
- if (cSilentThreads + cNotifyThreads < 1 ||
- cSilentThreads + cNotifyThreads > 20) // must have between 1 and 20 threads
- return E_INVALIDARG;
- m_nThreadIDNotify = nThreadIDNotify;
- m_cSilentThreads = cSilentThreads;
- m_cNotifyThreads = cNotifyThreads;
- // create event used to signal that ALL sql threads should exit
- m_hKillSQLEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
- assert(m_hKillSQLEvent);
- hr = m_connection.OpenFromInitializationString(strSQLConfig);
- if (FAILED(hr))
- {
- DumpErrorInfo(m_connection.m_spInit, IID_IDBInitialize, NULL);
- return hr;
- }
- m_hQueryNotify = CreateSemaphore(NULL, 0, MAXLONG, NULL);
- m_hQuerySilent = CreateSemaphore(NULL, 0, MAXLONG, NULL);
- m_pthdQueue = new CSQLQueueThread(m_hKillSQLEvent, this);
- assert (m_pthdQueue);
- m_pargSilentThreads = (CSQLThread **) new char[sizeof(CSQLThread *) * m_cSilentThreads];
- m_pargNotifyThreads = (CSQLThread **) new char[sizeof(CSQLThread *) * m_cNotifyThreads];
- DWORD i = 0;
- for (i = 0; i < m_cNotifyThreads; i++)
- {
- m_pargNotifyThreads[i] = new CSQLThread(m_hKillSQLEvent, true, this);
- }
- for (i = 0; i < m_cSilentThreads; i++)
- {
- m_pargSilentThreads[i] = new CSQLThread(m_hKillSQLEvent, false, this);
- }
- return hr;
- }
- CSQLCore::~CSQLCore()
- {
- DWORD i = 0;
- for (i = 0; i < m_cNotifyThreads && m_pargNotifyThreads; i++)
- {
- // TODO: cleanup m_listQuries
- delete m_pargNotifyThreads[i];
- }
- delete [] m_pargNotifyThreads;
- for (i = 0; i < m_cSilentThreads && m_pargSilentThreads; i++)
- {
- // TODO: cleanup m_listQuries
- delete m_pargSilentThreads[i];
- }
- delete [] m_pargSilentThreads;
- CloseHandle(m_hQueryNotify);
- CloseHandle(m_hQuerySilent);
- }
- void CSQLCore::AddQuery(CSQLQuery * pquery)
- {
- HANDLE h = NULL;
- if (pquery->GetNotify())
- {
- m_listQueriesNotify.last(pquery);
- h = GetNotifySemaphore();
- }
- else
- {
- m_listQueriesSilent.last(pquery);
- h = GetSilentSemaphore();
- }
- ReleaseSemaphore(h, 1, NULL); // signal some thread to pick up the query
- }
- void CSQLCore::PostQuery(CSQLQuery * pquery)
- {
- PostThreadMessage(m_pthdQueue->GetThreadID(), wm_sql_addquery, (WPARAM) NULL, (LPARAM) pquery);
- }
- HRESULT CSQLThread::ServiceQuery(CSQLQuery * pqueryNew)
- {
- CSQLQuery * pqueryCache = NULL;
- REFGUID guid = pqueryNew->GetGuid();
- HRESULT hrQuery = E_FAIL; // it is important that ONLY results from calls to pqueryCache be assigned here
- // See if this query is already in the per-thread cache
- for (SLinkQueries * plinkQuery = m_listQueries.first(); plinkQuery && !pqueryCache; plinkQuery = plinkQuery->next())
- {
- CSQLQuery * pqueryT = plinkQuery->data();
- if (pqueryT->GetGuid() == guid)
- {
- // make sure we didn't accidentally use the same guid twice for different queries
- assert(!lstrcmp(pqueryNew->GetStrQuery(), pqueryT->GetStrQuery()));
- pqueryCache = pqueryT;
- }
- }
- if (!pqueryCache) // the first time this thread has seen this query, so let's add it to our reportoire
- {
- //pqueryCache = pqueryNew;
- pqueryCache = pqueryNew->Copy(NULL);
- m_listQueries.first(pqueryCache);
- hrQuery = pqueryCache->OnPrepare(m_session);
- }
- else
- {
- hrQuery = S_OK;
- pqueryNew->Copy(pqueryCache);
- }
- bool fRetry = SUCCEEDED(hrQuery);
- int cRetries = 0;
- while (fRetry)
- {
- hrQuery = pqueryCache->OnExecute();
- if (SUCCEEDED(hrQuery))
- {
- fRetry = false;
- }
- else
- {
- m_psql->DumpErrorInfo(pqueryCache->GetIUnknown(), IID_ICommand, &fRetry);
- if (fRetry)
- {
- debugf("Query deadlocked or timed-out. Retry #%d\n", ++cRetries);
- Sleep(50 * cRetries);
- }
- }
- }
- if (FAILED(hrQuery))
- m_psql->DumpErrorInfo(pqueryCache->GetIUnknown(), IID_ICommand, NULL);
- pqueryCache->Copy(pqueryNew);
- // We post it whether the client requested notification or not, because at least we need to let the query clean up itself
- if (pqueryNew->GetCallbackOnMainThread())
- {
- BOOL bPosted = PostThreadMessage(GetSQLCore()->GetNotifyThreadID(), wm_sql_querydone, (WPARAM) NULL, (LPARAM) pqueryNew);
- if (!bPosted)
- {
- // If the PostThreadMessage failed because the thread id is gone, we still need to clean up
- DWORD dwLastError = ::GetLastError();
- if (ERROR_INVALID_THREAD_ID == dwLastError)
- {
- pqueryNew->DataReady();
- }
- }
- }
- else
- {
- pqueryNew->DataReady();
- }
- return hrQuery;
- }
|