/* Once for each server */
LISTENING_PORTS,
NUM_THREADS,
+ PRESPAWN_THREADS,
RUN_AS_USER,
CONFIG_TCP_NODELAY, /* Prepended CONFIG_ to avoid conflict with the
* socket option typedef TCP_NODELAY. */
/* Once for each server */
{"listening_ports", MG_CONFIG_TYPE_STRING_LIST, "8080"},
{"num_threads", MG_CONFIG_TYPE_NUMBER, "50"},
+ {"prespawn_threads", MG_CONFIG_TYPE_NUMBER, "0"},
{"run_as_user", MG_CONFIG_TYPE_STRING, NULL},
{"tcp_nodelay", MG_CONFIG_TYPE_NUMBER, "0"},
{"max_request_size", MG_CONFIG_TYPE_NUMBER, "16384"},
pthread_t masterthreadid; /* The master thread ID */
unsigned int
- cfg_worker_threads; /* The number of configured worker threads. */
+ cfg_max_worker_threads; /* How many worker-threads we are allowed to create, total */
+
+ unsigned int
+ spawned_worker_threads; /* How many worker-threads currently exist (modified by master thread) */
+ unsigned int
+ idle_worker_thread_count; /* How many worker-threads are currently sitting around with nothing to do */
+ /* Access to this value MUST be synchronized by thread_mutex */
+
pthread_t *worker_threadids; /* The worker thread IDs */
unsigned long starter_thread_idx; /* thread index which called mg_start */
* timeouts, we will just wait a few seconds in mg_join_thread. */
/* join worker thread */
- for (i = 0; i < conn->phys_ctx->cfg_worker_threads; i++) {
+ for (i = 0; i < conn->phys_ctx->spawned_worker_threads; i++) {
mg_join_thread(conn->phys_ctx->worker_threadids[i]);
}
}
/* Now upgrade to ws/wss client context */
conn->phys_ctx->user_data = user_data;
conn->phys_ctx->context_type = CONTEXT_WS_CLIENT;
- conn->phys_ctx->cfg_worker_threads = 1; /* one worker thread */
+ conn->phys_ctx->cfg_max_worker_threads = 1; /* one worker thread */
+ conn->phys_ctx->spawned_worker_threads = 1; /* one worker thread */
/* Start a thread to read the websocket client connection
* This thread will automatically stop when mg_disconnect is
thread_data,
conn->phys_ctx->worker_threadids)
!= 0) {
- conn->phys_ctx->cfg_worker_threads = 0;
+ conn->phys_ctx->spawned_worker_threads = 0;
mg_free(thread_data);
mg_close_connection(conn);
conn = NULL;
#endif
}
+static int mg_start_worker_thread(struct mg_context *ctx, int only_if_no_idle_threads); /* forward declaration */
#if defined(ALTERNATIVE_QUEUE)
{
unsigned int i;
+ (void)mg_start_worker_thread(ctx, 1); /* will start a worker-thread only if there aren't currently any idle worker-threads */
+
while (!ctx->stop_flag) {
- for (i = 0; i < ctx->cfg_worker_threads; i++) {
+ for (i = 0; i < ctx->spawned_worker_threads; i++) {
/* find a free worker slot and signal it */
if (ctx->client_socks[i].in_use == 2) {
(void)pthread_mutex_lock(&ctx->thread_mutex);
static int
-consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
+consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index, int counter_was_preincremented)
{
DEBUG_TRACE("%s", "going idle");
(void)pthread_mutex_lock(&ctx->thread_mutex);
+ if (counter_was_preincremented == 0) { /* first call only: the master-thread pre-incremented this before he spawned us */
+ ctx->idle_worker_thread_count++;
+ }
ctx->client_socks[thread_index].in_use = 2;
(void)pthread_mutex_unlock(&ctx->thread_mutex);
}
return 0;
}
+ ctx->idle_worker_thread_count--;
(void)pthread_mutex_unlock(&ctx->thread_mutex);
if (sp->in_use == 1) {
DEBUG_TRACE("grabbed socket %d, going busy", sp->sock);
/* Worker threads take accepted socket from the queue */
static int
-consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
+consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index, int counter_was_preincremented)
{
(void)thread_index;
- (void)pthread_mutex_lock(&ctx->thread_mutex);
DEBUG_TRACE("%s", "going idle");
+ (void)pthread_mutex_lock(&ctx->thread_mutex);
+ if (counter_was_preincremented == 0) { /* first call only: the master-thread pre-incremented this before he spawned us */
+ ctx->idle_worker_thread_count++;
+ }
/* If the queue is empty, wait. We're idle at this point. */
while ((ctx->sq_head == ctx->sq_tail)
}
(void)pthread_cond_signal(&ctx->sq_empty);
+
+ ctx->idle_worker_thread_count--;
(void)pthread_mutex_unlock(&ctx->thread_mutex);
return STOP_FLAG_IS_ZERO(&ctx->stop_flag);
(void)pthread_cond_signal(&ctx->sq_full);
(void)pthread_mutex_unlock(&ctx->thread_mutex);
+
+ (void)mg_start_worker_thread(ctx, 1); /* will start a worker-thread only if there aren't currently any idle worker-threads */
}
#endif /* ALTERNATIVE_QUEUE */
struct mg_context *ctx = conn->phys_ctx;
int thread_index;
struct mg_workerTLS tls;
+ int first_call_to_consume_socket = 1;
mg_set_thread_name("worker");
/* Connection structure has been pre-allocated */
thread_index = (int)(conn - ctx->worker_connections);
if ((thread_index < 0)
- || ((unsigned)thread_index >= (unsigned)ctx->cfg_worker_threads)) {
+ || ((unsigned)thread_index >= (unsigned)ctx->cfg_max_worker_threads)) {
mg_cry_ctx_internal(ctx,
"Internal error: Invalid worker index %i",
thread_index);
/* Call consume_socket() even when ctx->stop_flag > 0, to let it
* signal sq_empty condvar to wake up the master waiting in
* produce_socket() */
- while (consume_socket(ctx, &conn->client, thread_index)) {
+ while (consume_socket(ctx, &conn->client, thread_index, first_call_to_consume_socket)) {
+ first_call_to_consume_socket = 0;
/* New connections must start with new protocol negotiation */
tls.alpn_proto = NULL;
/* Wakeup workers that are waiting for connections to handle. */
#if defined(ALTERNATIVE_QUEUE)
- for (i = 0; i < ctx->cfg_worker_threads; i++) {
+ for (i = 0; i < ctx->spawned_worker_threads; i++) {
event_signal(ctx->client_wait_events[i]);
}
#else
#endif
/* Join all worker threads to avoid leaking threads. */
- workerthreadcount = ctx->cfg_worker_threads;
+ workerthreadcount = ctx->spawned_worker_threads;
for (i = 0; i < workerthreadcount; i++) {
if (ctx->worker_threadids[i] != 0) {
mg_join_thread(ctx->worker_threadids[i]);
#if defined(ALTERNATIVE_QUEUE)
mg_free(ctx->client_socks);
if (ctx->client_wait_events != NULL) {
- for (i = 0; (unsigned)i < ctx->cfg_worker_threads; i++) {
+ for (i = 0; (unsigned)i < ctx->spawned_worker_threads; i++) {
event_destroy(ctx->client_wait_events[i]);
}
mg_free(ctx->client_wait_events);
}
+static int mg_start_worker_thread(struct mg_context *ctx, int only_if_no_idle_threads) {
+ const unsigned int i = ctx->spawned_worker_threads;
+ if (i >= ctx->cfg_max_worker_threads) {
+ return -1; /* Oops, we hit our worker-thread limit! No more worker threads, ever! */
+ }
+
+ (void)pthread_mutex_lock(&ctx->thread_mutex);
+ if ((only_if_no_idle_threads)&&(ctx->idle_worker_thread_count > 0)) {
+ (void)pthread_mutex_unlock(&ctx->thread_mutex);
+ return -2; /* There are idle threads available, so no need to spawn a new worker thread now */
+ }
+ ctx->idle_worker_thread_count++; /* we do this here to avoid a race condition while the thread is starting up */
+ (void)pthread_mutex_unlock(&ctx->thread_mutex);
+
+ ctx->worker_connections[i].phys_ctx = ctx;
+ int ret = mg_start_thread_with_id(worker_thread,
+ &ctx->worker_connections[i],
+ &ctx->worker_threadids[i]);
+ if (ret == 0) {
+ ctx->spawned_worker_threads++; /* note that we've filled another slot in the table */
+ DEBUG_TRACE("Started worker_thread #%i", ctx->spawned_worker_threads);
+ } else {
+ (void)pthread_mutex_lock(&ctx->thread_mutex);
+ ctx->idle_worker_thread_count--; /* whoops, roll-back on error */
+ (void)pthread_mutex_unlock(&ctx->thread_mutex);
+ }
+ return ret;
+}
+
CIVETWEB_API struct mg_context *
mg_start2(struct mg_init_data *init, struct mg_error_data *error)
{
struct mg_context *ctx;
const char *name, *value, *default_value;
- int idx, ok, workerthreadcount;
+ int idx, ok, prespawnthreadcount, workerthreadcount;
unsigned int i;
int itmp;
void (*exit_callback)(const struct mg_context *ctx) = 0;
#endif
/* Worker thread count option */
- workerthreadcount = atoi(ctx->dd.config[NUM_THREADS]);
+ workerthreadcount = atoi(ctx->dd.config[NUM_THREADS]);
+ prespawnthreadcount = atoi(ctx->dd.config[PRESPAWN_THREADS]);
+
+ if ((prespawnthreadcount < 0)||(prespawnthreadcount > workerthreadcount)) {
+ prespawnthreadcount = workerthreadcount; /* can't prespawn more than all of them! */
+ }
if ((workerthreadcount > MAX_WORKER_THREADS) || (workerthreadcount <= 0)) {
if (workerthreadcount <= 0) {
return NULL;
}
- ctx->cfg_worker_threads = ((unsigned int)(workerthreadcount));
- ctx->worker_threadids = (pthread_t *)mg_calloc_ctx(ctx->cfg_worker_threads,
+ ctx->cfg_max_worker_threads = ((unsigned int)(workerthreadcount));
+ ctx->worker_threadids = (pthread_t *)mg_calloc_ctx(ctx->cfg_max_worker_threads,
sizeof(pthread_t),
ctx);
if (error != NULL) {
error->code = MG_ERROR_DATA_CODE_OUT_OF_MEMORY;
error->code_sub =
- (unsigned)ctx->cfg_worker_threads * (unsigned)sizeof(pthread_t);
+ (unsigned)ctx->cfg_max_worker_threads * (unsigned)sizeof(pthread_t);
mg_snprintf(NULL,
NULL, /* No truncation check for error buffers */
error->text,
return NULL;
}
ctx->worker_connections =
- (struct mg_connection *)mg_calloc_ctx(ctx->cfg_worker_threads,
+ (struct mg_connection *)mg_calloc_ctx(ctx->cfg_max_worker_threads,
sizeof(struct mg_connection),
ctx);
if (ctx->worker_connections == NULL) {
if (error != NULL) {
error->code = MG_ERROR_DATA_CODE_OUT_OF_MEMORY;
- error->code_sub = (unsigned)ctx->cfg_worker_threads
+ error->code_sub = (unsigned)ctx->cfg_max_worker_threads
* (unsigned)sizeof(struct mg_connection);
mg_snprintf(NULL,
NULL, /* No truncation check for error buffers */
#if defined(ALTERNATIVE_QUEUE)
ctx->client_wait_events =
- (void **)mg_calloc_ctx(ctx->cfg_worker_threads,
+ (void **)mg_calloc_ctx(ctx->cfg_max_worker_threads,
sizeof(ctx->client_wait_events[0]),
ctx);
if (ctx->client_wait_events == NULL) {
if (error != NULL) {
error->code = MG_ERROR_DATA_CODE_OUT_OF_MEMORY;
- error->code_sub = (unsigned)ctx->cfg_worker_threads
+ error->code_sub = (unsigned)ctx->cfg_max_worker_threads
* (unsigned)sizeof(ctx->client_wait_events[0]);
mg_snprintf(NULL,
NULL, /* No truncation check for error buffers */
}
ctx->client_socks =
- (struct socket *)mg_calloc_ctx(ctx->cfg_worker_threads,
+ (struct socket *)mg_calloc_ctx(ctx->cfg_max_worker_threads,
sizeof(ctx->client_socks[0]),
ctx);
if (ctx->client_socks == NULL) {
if (error != NULL) {
error->code = MG_ERROR_DATA_CODE_OUT_OF_MEMORY;
- error->code_sub = (unsigned)ctx->cfg_worker_threads
+ error->code_sub = (unsigned)ctx->cfg_max_worker_threads
* (unsigned)sizeof(ctx->client_socks[0]);
mg_snprintf(NULL,
NULL, /* No truncation check for error buffers */
return NULL;
}
- for (i = 0; (unsigned)i < ctx->cfg_worker_threads; i++) {
+ for (i = 0; (unsigned)i < ctx->cfg_max_worker_threads; i++) {
ctx->client_wait_events[i] = event_create();
if (ctx->client_wait_events[i] == 0) {
const char *err_msg = "Error creating worker event %i";
ctx->context_type = CONTEXT_SERVER; /* server context */
/* Start worker threads */
- for (i = 0; i < ctx->cfg_worker_threads; i++) {
+ for (int i = 0; i < prespawnthreadcount; i++) {
/* worker_thread sets up the other fields */
- ctx->worker_connections[i].phys_ctx = ctx;
- if (mg_start_thread_with_id(worker_thread,
- &ctx->worker_connections[i],
- &ctx->worker_threadids[i])
- != 0) {
-
+ if (mg_start_worker_thread(ctx, 0) != 0) {
long error_no = (long)ERRNO;
/* thread was not created */
- if (i > 0) {
+ if (ctx->spawned_worker_threads > 0) {
/* If the second, third, ... thread cannot be created, set a
* warning, but keep running. */
mg_cry_ctx_internal(ctx,
"Cannot start worker thread %i: error %ld",
- i + 1,
+ ctx->spawned_worker_threads + 1,
error_no);
/* If the server initialization should stop here, all
return 0;
}
- if ((unsigned)idx >= ctx->cfg_worker_threads) {
+ if ((unsigned)idx >= ctx->cfg_max_worker_threads) {
/* Out of range */
return 0;
}