123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630 |
- /*
- * Copyright (c) 2008-2009 Nokia Corporation and/or its subsidiary(-ies).
- * All rights reserved.
- * This component and the accompanying materials are made available
- * under the terms of the License "Eclipse Public License v1.0"
- * which accompanies this distribution, and is available
- * at the URL "http://www.eclipse.org/legal/epl-v10.html".
- *
- * Initial Contributors:
- * Nokia Corporation - initial contribution.
- *
- * Contributors:
- *
- * Description:
- *
- */
- /*
- descramble.cpp:
- Description
- -----------
- "descramble" is a program that buffers standard input until end of file
- and then copies the buffers to the standard output in such a way that
- if there are many copies of descramble running, each will be given an
- opportunity to write the complete contents of its buffers while the
- others wait.
- Purpose
- -------
- descramble is used to ensure that output from multiple concurrent processes
- is not interleaved. It is useful in build systems such as GNU make with
- the -j switch, although it requires that makefiles are changed before it can
- work.
- Design
- ------
- This program may be built on Linux or on Windows. On both platforms the
- basic behavior is similar:
- 1) Read standard input into buffers. Allocate these dynamically
- so that there is no limit.
- 2) Wait on a named or system-wide semaphore.
- 3) Output all the buffers to stdout.
- The name of the semaphore is a parameter and should be chosen so that multiple
- instances of the build system (or whatever process is running many tasks with
- descramble) cannot block each other.
- Special Considerations for Linux
- --------------------------------
- A named semaphore is a file in the filesystem. It is not automatically removed
- when a process exits. descramble provides a "stop" parameter to be run at the
- "end" of the build system (or other process) that will clean away this semaphore.
- Special Considerations for Windows
- ----------------------------------
- The windows implementation is built with the MINGW toolchain. On windows
- problems have been noticed that require a fairly complex timeout capability
- such that descramble will not wait "forever" for output from stdin.
- This solution currently uses a "kill thread" that will stop descramble if
- it is blocked in a read on stdio for more than the timeout period.
- The "kill thread" attempts to print as much of the input from stdin as has
- been read so far. It scans this for XML characters and escapes them, finally
- printing its own XML-formatted error message with the escaped version of the
- input between <descramble> tags.
- */
- #include <stdio.h>
- #include <vector>
- #include <ctype.h>
- // what size chunks to allocate/read
- const int BufferSize = 4096;
- unsigned int globalreadcounter = 0;
- // OS specific headers
- #ifdef WIN32
- #include <windows.h>
- #include <tlhelp32.h>
- #include <fcntl.h> /* _O_BINARY */
- #else
- #include <stdlib.h>
- #include <fcntl.h>
- #include <semaphore.h>
- #include <sys/types.h>
- #include <signal.h>
- #include <string.h>
- #endif
- #include <unistd.h>
- #define DEFAULT_TIMEOUT (600000) // 10 minutes
- #define GOOD_OUTPUT 1
- #define BAD_OUTPUT 0
- typedef struct
- {
- char *name;
- #ifdef WIN32
- HANDLE handle;
- #else
- sem_t *handle;
- #endif
- unsigned int timeout;
- } sbs_semaphore;
- // readstate is a flag to indicate whether descramble is reading
- // it's standard input.
- // This allows the timeout thread on Windows to only cause the
- // process to exit if there is an overdue read operation on the
- // standard input.
- int readstate=1;
- int timeoutstate=0;
- // The output semaphore.
- sbs_semaphore sem;
- #ifdef WIN32
- HANDLE bufferMutex;
- // Make all output handling binary
- unsigned int _CRT_fmode = _O_BINARY;
- DWORD killPIDTree = 0;
- #else
- pid_t killPIDTree = 0;
- #endif
- // Where we store all the standard input.
- std::vector<char*> buffers;
- std::vector<int> bytesIn;
- void error(const char *reason, char *SEM_NAME)
- {
- fprintf(stderr, "<descrambler reason='%s' semaphore='%s' />\n", reason, SEM_NAME);
- exit(1);
- }
- #ifdef WIN32
- void killProcess(DWORD pid)
- {
- HANDLE proc = OpenProcess(PROCESS_TERMINATE,0,pid);
- if (proc)
- {
- TerminateProcess(proc, 1);
- //fprintf(stdout,"sent terminate to process=%d\n", pid);
- CloseHandle(proc);
- }
- else
- {
- //fprintf(stderr,"Can't open process=%d\n", pid);
- }
- }
- typedef struct {
- DWORD PID;
- DWORD PPID;
- } proc;
- void killProcessTreeRecursively(DWORD PPID, DWORD thisPID, std::vector<proc *> &processtree)
- {
- int idx;
- for (idx=0; idx < processtree.size(); idx++)
- {
- if (processtree[idx]->PID != thisPID && processtree[idx]->PPID == PPID)
- {
- killProcessTreeRecursively(processtree[idx]->PID, thisPID, processtree);
- //fprintf(stderr,"Found descendant =%d\n",processtree[idx]->PID );
- }
- }
- killProcess(PPID);
- }
- int killProcessTree(DWORD PPID)
- {
- HANDLE hSnapShot;
- DWORD thisProcID=0;
- BOOL ok;
- PROCESSENTRY32 pe;
- std::vector<proc *> processtree;
- thisProcID = GetCurrentProcessId();
- hSnapShot=CreateToolhelp32Snapshot( TH32CS_SNAPPROCESS, 0 );
- // Put all this process information into an array;
- ok = Process32First(hSnapShot, &pe);
- while (ok)
- {
- if ( pe.th32ProcessID != thisProcID)
- {
- proc *p = new proc;
- p->PID = pe.th32ProcessID;
- p->PPID = pe.th32ParentProcessID;
- processtree.push_back(p);
- //fprintf(stderr,"Found process =%d\n", pe.th32ProcessID );
- }
- ok = Process32Next(hSnapShot, &pe);
- }
- killProcessTreeRecursively(PPID, thisProcID, processtree);
- CloseHandle(hSnapShot);
- //fprintf(stderr,"Ending killproc\n", PPID);
- return 0;
- }
- #endif
- void createDescrambleSemaphore(sbs_semaphore *s)
- {
- #ifdef WIN32
- s->handle = CreateSemaphore(NULL, 1, 1, s->name);
- if (s->handle)
- CloseHandle(s->handle);
- else
- error("unable to create semaphore", s->name);
- #else
- s->handle = sem_open(s->name, O_CREAT | O_EXCL, 0644, 1);
- if (s->handle == SEM_FAILED)
- {
- sem_close(s->handle);
- error("unable to create semaphore", s->name);
- }
- sem_close(s->handle);
- #endif
- }
- void destroyDescrambleSemaphore(sbs_semaphore *s)
- {
- #ifdef WIN32
- /* can't destroy a windows semaphore... */
- #else
- if (sem_unlink(s->name) != 0)
- error("unable to unlink semaphore", s->name);
- #endif
- }
- int waitForOutput(sbs_semaphore *s)
- {
- /* try and open the semaphore now */
- #ifdef WIN32
- s->handle = CreateSemaphore(NULL, 1, 1, s->name);
- if (!s->handle)
- error("unable to open semaphore", s->name);
- #else
- s->handle = sem_open(s->name, 0);
- if (s->handle == SEM_FAILED)
- {
- sem_close(s->handle);
- error("unable to open semaphore", s->name);
- }
- #endif
- /* wait for the semaphore to be free [timeout after 60 seconds] */
- int timedOutFlag = 0;
- #ifdef WIN32
- timedOutFlag = (WaitForSingleObject(s->handle, s->timeout) != WAIT_OBJECT_0);
- #else
- sem_wait(s->handle);
- #endif
- return timedOutFlag;
- }
- void releaseOutput(sbs_semaphore *s)
- {
- /* release the semaphore */
- #ifdef WIN32
- ReleaseSemaphore(s->handle, 1, NULL);
- #else
- sem_post(s->handle);
- #endif
- /* clean up */
- #ifdef WIN32
- CloseHandle(s->handle);
- #else
- sem_close(s->handle);
- #endif
- }
- void writeBuffers(int goodoutput)
- {
- /* write stdin buffers to stdout. If the output comes
- from a partially-complete command then make sure that there
- is no malformed xml inside by escaping it. */
- char *escaped_output=NULL;
- #ifdef WIN32
- DWORD dwWaitResult = WaitForSingleObject(
- bufferMutex,
- INFINITE);
- #endif
- for (int i = 0; i < buffers.size(); i++)
- {
- int bytes_out;
- char *outbuf;
- if (goodoutput != GOOD_OUTPUT)
- {
- if (!escaped_output)
- escaped_output = new char[BufferSize*4];
- if (!escaped_output)
- error("No memory for escaped outputbuffer.",sem.name);
- char *buf = buffers[i];
- bytes_out = 0;
- for (int idx=0; idx < bytesIn[i]; idx++)
- {
- switch (buf[idx])
- {
- case '&':
- escaped_output[bytes_out++]='&';
- escaped_output[bytes_out++]='a';
- escaped_output[bytes_out++]='m';
- escaped_output[bytes_out++]='p';
- escaped_output[bytes_out++]=';';
- break;
- case '<':
- escaped_output[bytes_out++]='&';
- escaped_output[bytes_out++]='l';
- escaped_output[bytes_out++]='t';
- escaped_output[bytes_out++]=';';
- break;
- case '>':
- escaped_output[bytes_out++]='&';
- escaped_output[bytes_out++]='g';
- escaped_output[bytes_out++]='t';
- escaped_output[bytes_out++]=';';
- break;
- default:
- if (!iscntrl(buf[idx]) || buf[idx] == '\n' || buf[idx] == '\r')
- escaped_output[bytes_out++]=buf[idx];
- break;
- }
- }
- outbuf = escaped_output;
- } else {
- outbuf = buffers[i];
- bytes_out = bytesIn[i];
- }
- fwrite(outbuf, 1, bytes_out, stdout);
- }
- #ifdef WIN32
- ReleaseMutex(bufferMutex);
- #endif
- if (escaped_output)
- delete escaped_output;
- fflush(stdout);
- }
- #ifdef WIN32
- /*
- A Thread that kills this process if it is "stuck" in a read operation
- for longer than the timeout period.
- There are some race conditions here that don't matter. e.g. globalreadcounter
- is not protected. This might result in an "unfair" timeout but we don't care
- because the timeout should be pretty long and anything that's even nearly
- a timeout deserves to be timed out.
- Additionally, if the timeout is so quick that this function starts before the first
- ever read has happened then there would be a premature timeout. This is not likely
- so we also dont' care - the timeout has a minimum value which is more than long
- enough (500msec) to deal with that.
- */
- DWORD descrambleKillerThread(void * param)
- {
- sbs_semaphore *s;
- unsigned int stored_globalreadcounter;
- s = (sbs_semaphore *)param;
- fflush(stderr);
- //fprintf(stdout, " timeout=%u sem_name='%s' \n", s->timeout, s->name);
- do
- {
- stored_globalreadcounter = globalreadcounter;
- Sleep(s->timeout);
- }
- while (globalreadcounter != stored_globalreadcounter);
- if (waitForOutput(s) != 0)
- {
- fprintf(stdout, "<descrambler reason='semaphore wait exceeded %ums timeout' semaphore='%s' />\n", s->timeout, s->name);
- ExitProcess(3);
- }
- if (readstate)
- {
- fprintf(stdout, "<descrambler reason='command output read exceeded %ums timeout' semaphore='%s'>\n", s->timeout, s->name);
- writeBuffers(BAD_OUTPUT);
- fprintf(stdout, "</descrambler>\n");
- fflush(stdout);
- if (killPIDTree != 0)
- killProcessTree(killPIDTree); // Make sure peers and parents all die. Nasty
- ExitProcess(2);
- }
- else
- {
- writeBuffers(GOOD_OUTPUT);
- }
- // Don't release the semaphore in case the main thread
- // gets it and tries to write the output.
- // Input process finished while we were waiting
- // for the semaphore so a false alarm.
- fflush(stdout);
- ExitProcess(0);
- }
- #endif
- int main(int argc, char *argv[])
- {
- char usage[]="usage: %s [-t timeout_millisecs] [ -k kill_PID_tree_on_fail ] buildID [start | stop]\nwhere timeout_millisecs >= 500\n";
- int opt_res;
- char options[]="t:k:";
- sem.timeout = DEFAULT_TIMEOUT;
- opt_res = getopt(argc, argv, options);
- while (opt_res != -1 )
- {
- switch (opt_res)
- {
- case 'k':
- if (!optarg)
- {
- fprintf(stderr, "PID argument required for 'kill PID tree on fail' option\n");
- fprintf(stderr, usage, argv[0]);
- exit(1);
- }
- killPIDTree = atol(optarg);
- if (killPIDTree == 0)
- {
- fprintf(stderr, usage, argv[0]);
- fprintf(stderr, "kill PID tree on fail must be > 0: %u\n", killPIDTree);
- exit(1);
- }
- break;
- case 't':
- if (!optarg)
- {
- fprintf(stderr, "timeout argument required for timeout option\n");
- fprintf(stderr, usage, argv[0]);
- exit(1);
- }
- sem.timeout = atoi(optarg);
- if (sem.timeout < 500)
- {
- fprintf(stderr, usage, argv[0]);
- fprintf(stderr, "timeout was too low: %u\n", sem.timeout);
- exit(1);
- }
- break;
- case '?':
- default:
- fprintf(stderr, usage, argv[0]);
- fprintf(stderr, "Unknown option. %s\n", opterr);
- exit(1);
- break;
- }
- opt_res = getopt(argc, argv, options);
- }
- if (optind >= argc)
- {
- fprintf(stderr, usage, argv[0]);
- fprintf(stderr, "Missing buildID\n");
- exit(1);
- }
- sem.name = argv[optind];
- if (optind + 1 < argc)
- {
- optind++;
- if (strncmp(argv[optind], "stop",4) == 0)
- destroyDescrambleSemaphore(&sem);
- else if (strncmp(argv[optind],"start",5) == 0)
- createDescrambleSemaphore(&sem);
- else
- {
- fprintf(stderr, usage, argv[0]);
- fprintf(stderr, "Unknown argument:: %s\n", argv[optind]);
- exit(1);
- }
- exit(0);
- }
- #ifdef WIN32
- HANDLE hStdin = GetStdHandle(STD_INPUT_HANDLE);
- bufferMutex = CreateMutex(NULL, FALSE, NULL);
- /*
- HANDLE WINAPI CreateThread(
- __in_opt LPSECURITY_ATTRIBUTES lpThreadAttributes,
- __in SIZE_T dwStackSize,
- __in LPTHREAD_START_ROUTINE lpStartAddress,
- __in_opt LPVOID lpParameter,
- __in DWORD dwCreationFlags,
- __out_opt LPDWORD lpThreadId
- ); */
- DWORD killerThreadId;
- HANDLE hKillerThread;
- hKillerThread = CreateThread(NULL, 4096, (LPTHREAD_START_ROUTINE) descrambleKillerThread, (void*)&sem, 0, &killerThreadId);
- #endif
- /* read all of my stdin into buffers */
- int bytesRead = 0;
- int bufferIndex = 0;
- do
- {
- char *buffer = new char[BufferSize];
- if (buffer == NULL)
- error("not enough memory for buffer", sem.name);
- // Add an empty buffer in advance so that if there is a timeout
- // the partial command result can be gathered.
- #ifdef WIN32
- DWORD dwWaitResult = WaitForSingleObject(
- bufferMutex,
- INFINITE);
- #endif
- buffers.push_back(buffer);
- bytesIn.push_back(0);
- int *counter = &bytesIn[bufferIndex];
- #ifdef WIN32
- ReleaseMutex(bufferMutex);
- #endif
- // Empty buffer added.
- char c = fgetc(stdin);
- //fprintf(stderr, "counter %d buffersize %d\n", *counter, BufferSize);
- do
- {
- if (c == EOF)
- break;
- // escape unprintable characters that might make logs unparsable.
- if (iscntrl(c) && !isspace(c))
- c = '_';
- buffer[*counter] = c;
- *counter += 1;
- if (*counter >= BufferSize)
- break;
- c = fgetc(stdin);
- globalreadcounter = ++globalreadcounter % 65535*4;
- }
- while (c != EOF);
- //fprintf(stderr, "## %d bytesin %d\n", bufferIndex, *counter);
- bufferIndex++;
- }
- while (!feof(stdin) && !timeoutstate);
- readstate = 0; // Tell the killerthread that it can back off.
- int timedout;
- timedout = waitForOutput(&sem);
- if (timedout)
- error("timed out waiting for semaphore", sem.name);
- else
- {
- writeBuffers(1);
- }
- releaseOutput(&sem);
- /* let the OS free the buffer memory */
- exit(0);
- }
|