]> git.feebdaed.xyz Git - 0xmirror/civetweb.git/commitdiff
Added demand-spawning of threads, and prespawn_threads config argument
authorJeremy Friesner <jaf@meyersound.com>
Sun, 14 May 2023 01:40:46 +0000 (18:40 -0700)
committerJeremy Friesner <jaf@meyersound.com>
Sun, 14 May 2023 04:16:22 +0000 (21:16 -0700)
docs/Embedding.md
docs/UserManual.md
resources/civetweb.conf
src/civetweb.c
test/page3.ssjs
unittest/private.c

index 0a0fa336f4c5d6a543918d593368a320ac621fc0..0291ff927aa8567ec0616f4a0676b5fbcb740666 100644 (file)
@@ -213,10 +213,15 @@ about web server instance:
 
 When `mg_start()` returns, all initialization is guaranteed to be complete
 (e.g. listening ports are opened, SSL is initialized, etc). `mg_start()` starts
-some threads: a master thread, that accepts new connections, and several
-worker threads, that process accepted connections. The number of worker threads
-is configurable via `num_threads` configuration option. That number puts a
+some threads: a master thread, that accepts new connections, and optionally some
+worker threads, that process accepted connections. The maximum number of worker
+threads is configurable via `num_threads` configuration option. That number puts a
 limit on number of simultaneous requests that can be handled by CivetWeb.
+
+The number of worker threads to be pre-spawned at startup is specified via the
+'prespawn_threads' configuration option; worker threads that are not pre-spawned
+will instead be demand-created the first time they are needed.
+
 If you embed CivetWeb into a program that uses SSL outside CivetWeb as well,
 you may need to initialize SSL before calling `mg_start()`, and set the pre-
 processor define `SSL_ALREADY_INITIALIZED`. This is not required if SSL is
index 358895f6a3fdeec57bcdbc3dbaf8ab09346e3696..e90ceef49197cf2705fa28b309bb77be2203483a 100644 (file)
@@ -558,8 +558,8 @@ The configuration value is approximate, the real limit might be a few bytes off.
 The minimum is 1024 (1 kB).
 
 ### num\_threads `50`
-Number of worker threads. CivetWeb handles each incoming connection in a
-separate thread. Therefore, the value of this option is effectively the number
+Maximum number of worker threads allowed. CivetWeb handles each incoming connection
+in a separate thread. Therefore, the value of this option is effectively the number
 of concurrent HTTP connections CivetWeb can handle.
 
 If there are more simultaneous requests (connection attempts), they are queued.
@@ -572,6 +572,15 @@ In case the clients are web browsers, it is recommended to use `num_threads` of
 at least 5, since browsers often establish multiple connections to load a single
 web page, including all linked documents (CSS, JavaScript, images, ...).
 
+### prespawn\_threads '0'
+Number of worker threads that should be pre-spawned by mg_start().  Defaults to
+0, meaning no worker threads will be pre-spawned at startup; rather, worker threads
+will be spawned when a new connection comes in and there aren't currently any
+idle worker threads available to handle it (if we haven't already reached the
+maximum worker-thread count as specified by num_threads).  If this value is
+specified less than zero, or greater than the value of num_threads, it will be
+treated as if it was specified to be equal to the value of num_threads.
+
 ### listen\_backlog `200`
 Maximum number of connections waiting to be accepted by the server operating system.
 Internally, this parameter is passed to the "listen" socket/system call.
@@ -876,8 +885,8 @@ All port, socket, process and thread specific parameters are per server:
 `enable_http2`, `enable_keep_alive`, `enable_websocket_ping_pong`,
 `keep_alive_timeout_ms`, `linger_timeout_ms`, `listen_backlog`,
 `listening_ports`, `lua_background_script`, `lua_background_script_params`,
-`max_request_size`, `num_threads`, `request_timeout_ms`, `run_as_user`,
-`tcp_nodelay`, `throttle`, `websocket_timeout_ms` + all options from `main.c`.
+`max_request_size`, `num_threads`, 'prespawn_threads', `request_timeout_ms`,
+`run_as_user`, `tcp_nodelay`, `throttle`, `websocket_timeout_ms` + all options from `main.c`.
 
 All other options can be set per domain. In particular
 `authentication_domain`, `document_root` and (for HTTPS) `ssl_certificate`
index c567548184f3528fd5e767727b7c5f458632bb7e..293f27b58e4a6479aaf4e61720b57d2ba970c822 100644 (file)
@@ -26,6 +26,7 @@ listening_ports 8080
 # extra_mime_types 
 # ssl_certificate 
 # num_threads 50
+# prespawn_threads 0
 # run_as_user 
 # url_rewrite_patterns 
 # hide_files_patterns 
index 064aca290d6e99ba263edb7b364dc5bd4e06a479..88883aee5d041548def96fab17519118551b1f41 100644 (file)
@@ -1934,6 +1934,7 @@ enum {
        /* Once for each server */
        LISTENING_PORTS,
        NUM_THREADS,
+       PRESPAWN_THREADS,
        RUN_AS_USER,
        CONFIG_TCP_NODELAY, /* Prepended CONFIG_ to avoid conflict with the
                             * socket option typedef TCP_NODELAY. */
@@ -2081,6 +2082,7 @@ static const struct mg_option config_options[] = {
     /* Once for each server */
     {"listening_ports", MG_CONFIG_TYPE_STRING_LIST, "8080"},
     {"num_threads", MG_CONFIG_TYPE_NUMBER, "50"},
+    {"prespawn_threads", MG_CONFIG_TYPE_NUMBER, "0"},
     {"run_as_user", MG_CONFIG_TYPE_STRING, NULL},
     {"tcp_nodelay", MG_CONFIG_TYPE_NUMBER, "0"},
     {"max_request_size", MG_CONFIG_TYPE_NUMBER, "16384"},
@@ -2394,7 +2396,14 @@ struct mg_context {
 
        pthread_t masterthreadid; /* The master thread ID */
        unsigned int
-           cfg_worker_threads;      /* The number of configured worker threads. */
+           cfg_max_worker_threads;  /* How many worker-threads we are allowed to create, total */
+
+       unsigned int
+           spawned_worker_threads;  /* How many worker-threads currently exist (modified by master thread) */
+       unsigned int
+           idle_worker_thread_count; /* How many worker-threads are currently sitting around with nothing to do */
+                                     /* Access to this value MUST be synchronized by thread_mutex */
+
        pthread_t *worker_threadids; /* The worker thread IDs */
        unsigned long starter_thread_idx; /* thread index which called mg_start */
 
@@ -17997,7 +18006,7 @@ mg_close_connection(struct mg_connection *conn)
                 * timeouts, we will just wait a few seconds in mg_join_thread. */
 
                /* join worker thread */
-               for (i = 0; i < conn->phys_ctx->cfg_worker_threads; i++) {
+               for (i = 0; i < conn->phys_ctx->spawned_worker_threads; i++) {
                        mg_join_thread(conn->phys_ctx->worker_threadids[i]);
                }
        }
@@ -19207,7 +19216,8 @@ mg_connect_websocket_client_impl(const struct mg_client_options *client_options,
        /* Now upgrade to ws/wss client context */
        conn->phys_ctx->user_data = user_data;
        conn->phys_ctx->context_type = CONTEXT_WS_CLIENT;
-       conn->phys_ctx->cfg_worker_threads = 1; /* one worker thread */
+       conn->phys_ctx->cfg_max_worker_threads = 1; /* one worker thread */
+       conn->phys_ctx->spawned_worker_threads = 1; /* one worker thread */
 
        /* Start a thread to read the websocket client connection
         * This thread will automatically stop when mg_disconnect is
@@ -19216,7 +19226,7 @@ mg_connect_websocket_client_impl(const struct mg_client_options *client_options,
                                    thread_data,
                                    conn->phys_ctx->worker_threadids)
            != 0) {
-               conn->phys_ctx->cfg_worker_threads = 0;
+               conn->phys_ctx->spawned_worker_threads = 0;
                mg_free(thread_data);
                mg_close_connection(conn);
                conn = NULL;
@@ -19587,6 +19597,7 @@ process_new_connection(struct mg_connection *conn)
 #endif
 }
 
+static int mg_start_worker_thread(struct mg_context *ctx, int only_if_no_idle_threads);  /* forward declaration */
 
 #if defined(ALTERNATIVE_QUEUE)
 
@@ -19595,8 +19606,10 @@ produce_socket(struct mg_context *ctx, const struct socket *sp)
 {
        unsigned int i;
 
+       (void)mg_start_worker_thread(ctx, 1);  /* will start a worker-thread only if there aren't currently any idle worker-threads */
+
        while (!ctx->stop_flag) {
-               for (i = 0; i < ctx->cfg_worker_threads; i++) {
+               for (i = 0; i < ctx->spawned_worker_threads; i++) {
                        /* find a free worker slot and signal it */
                        if (ctx->client_socks[i].in_use == 2) {
                                (void)pthread_mutex_lock(&ctx->thread_mutex);
@@ -19621,10 +19634,13 @@ produce_socket(struct mg_context *ctx, const struct socket *sp)
 
 
 static int
-consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
+consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index, int counter_was_preincremented)
 {
        DEBUG_TRACE("%s", "going idle");
        (void)pthread_mutex_lock(&ctx->thread_mutex);
+       if (counter_was_preincremented == 0) {  /* first call only: the master-thread pre-incremented this before he spawned us */
+               ctx->idle_worker_thread_count++;
+       }
        ctx->client_socks[thread_index].in_use = 2;
        (void)pthread_mutex_unlock(&ctx->thread_mutex);
 
@@ -19641,6 +19657,7 @@ consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
                }
                return 0;
        }
+       ctx->idle_worker_thread_count--;
        (void)pthread_mutex_unlock(&ctx->thread_mutex);
        if (sp->in_use == 1) {
                DEBUG_TRACE("grabbed socket %d, going busy", sp->sock);
@@ -19655,12 +19672,15 @@ consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
 
 /* Worker threads take accepted socket from the queue */
 static int
-consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
+consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index, int counter_was_preincremented)
 {
        (void)thread_index;
 
-       (void)pthread_mutex_lock(&ctx->thread_mutex);
        DEBUG_TRACE("%s", "going idle");
+       (void)pthread_mutex_lock(&ctx->thread_mutex);
+       if (counter_was_preincremented == 0) {  /* first call only: the master-thread pre-incremented this before he spawned us */
+               ctx->idle_worker_thread_count++;
+       }
 
        /* If the queue is empty, wait. We're idle at this point. */
        while ((ctx->sq_head == ctx->sq_tail)
@@ -19684,6 +19704,8 @@ consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
        }
 
        (void)pthread_cond_signal(&ctx->sq_empty);
+
+       ctx->idle_worker_thread_count--;
        (void)pthread_mutex_unlock(&ctx->thread_mutex);
 
        return STOP_FLAG_IS_ZERO(&ctx->stop_flag);
@@ -19730,6 +19752,8 @@ produce_socket(struct mg_context *ctx, const struct socket *sp)
 
        (void)pthread_cond_signal(&ctx->sq_full);
        (void)pthread_mutex_unlock(&ctx->thread_mutex);
+
+       (void)mg_start_worker_thread(ctx, 1);  /* will start a worker-thread only if there aren't currently any idle worker-threads */
 }
 #endif /* ALTERNATIVE_QUEUE */
 
@@ -19740,6 +19764,7 @@ worker_thread_run(struct mg_connection *conn)
        struct mg_context *ctx = conn->phys_ctx;
        int thread_index;
        struct mg_workerTLS tls;
+       int first_call_to_consume_socket = 1;
 
        mg_set_thread_name("worker");
 
@@ -19765,7 +19790,7 @@ worker_thread_run(struct mg_connection *conn)
        /* Connection structure has been pre-allocated */
        thread_index = (int)(conn - ctx->worker_connections);
        if ((thread_index < 0)
-           || ((unsigned)thread_index >= (unsigned)ctx->cfg_worker_threads)) {
+           || ((unsigned)thread_index >= (unsigned)ctx->cfg_max_worker_threads)) {
                mg_cry_ctx_internal(ctx,
                                    "Internal error: Invalid worker index %i",
                                    thread_index);
@@ -19806,7 +19831,8 @@ worker_thread_run(struct mg_connection *conn)
        /* Call consume_socket() even when ctx->stop_flag > 0, to let it
         * signal sq_empty condvar to wake up the master waiting in
         * produce_socket() */
-       while (consume_socket(ctx, &conn->client, thread_index)) {
+       while (consume_socket(ctx, &conn->client, thread_index, first_call_to_consume_socket)) {
+               first_call_to_consume_socket = 0;
 
                /* New connections must start with new protocol negotiation */
                tls.alpn_proto = NULL;
@@ -20188,7 +20214,7 @@ master_thread_run(struct mg_context *ctx)
 
        /* Wakeup workers that are waiting for connections to handle. */
 #if defined(ALTERNATIVE_QUEUE)
-       for (i = 0; i < ctx->cfg_worker_threads; i++) {
+       for (i = 0; i < ctx->spawned_worker_threads; i++) {
                event_signal(ctx->client_wait_events[i]);
        }
 #else
@@ -20198,7 +20224,7 @@ master_thread_run(struct mg_context *ctx)
 #endif
 
        /* Join all worker threads to avoid leaking threads. */
-       workerthreadcount = ctx->cfg_worker_threads;
+       workerthreadcount = ctx->spawned_worker_threads;
        for (i = 0; i < workerthreadcount; i++) {
                if (ctx->worker_threadids[i] != 0) {
                        mg_join_thread(ctx->worker_threadids[i]);
@@ -20302,7 +20328,7 @@ free_context(struct mg_context *ctx)
 #if defined(ALTERNATIVE_QUEUE)
        mg_free(ctx->client_socks);
        if (ctx->client_wait_events != NULL) {
-               for (i = 0; (unsigned)i < ctx->cfg_worker_threads; i++) {
+               for (i = 0; (unsigned)i < ctx->spawned_worker_threads; i++) {
                        event_destroy(ctx->client_wait_events[i]);
                }
                mg_free(ctx->client_wait_events);
@@ -20494,12 +20520,41 @@ legacy_init(const char **options)
 }
 
 
+static int mg_start_worker_thread(struct mg_context *ctx, int only_if_no_idle_threads) {
+       const unsigned int i = ctx->spawned_worker_threads;
+       if (i >= ctx->cfg_max_worker_threads) {
+               return -1;  /* Oops, we hit our worker-thread limit!  No more worker threads, ever! */
+       }
+
+       (void)pthread_mutex_lock(&ctx->thread_mutex);
+       if ((only_if_no_idle_threads)&&(ctx->idle_worker_thread_count > 0)) {
+               (void)pthread_mutex_unlock(&ctx->thread_mutex);
+               return -2;  /* There are idle threads available, so no need to spawn a new worker thread now */
+       }
+       ctx->idle_worker_thread_count++;  /* we do this here to avoid a race condition while the thread is starting up */
+       (void)pthread_mutex_unlock(&ctx->thread_mutex);
+
+       ctx->worker_connections[i].phys_ctx = ctx;
+       int ret = mg_start_thread_with_id(worker_thread,
+                                   &ctx->worker_connections[i],
+                                   &ctx->worker_threadids[i]);
+       if (ret == 0) {
+               ctx->spawned_worker_threads++;  /* note that we've filled another slot in the table */
+               DEBUG_TRACE("Started worker_thread #%i", ctx->spawned_worker_threads);
+       } else {
+               (void)pthread_mutex_lock(&ctx->thread_mutex);
+               ctx->idle_worker_thread_count--;  /* whoops, roll-back on error */
+               (void)pthread_mutex_unlock(&ctx->thread_mutex);
+       }
+       return ret;
+}
+
 CIVETWEB_API struct mg_context *
 mg_start2(struct mg_init_data *init, struct mg_error_data *error)
 {
        struct mg_context *ctx;
        const char *name, *value, *default_value;
-       int idx, ok, workerthreadcount;
+       int idx, ok, prespawnthreadcount, workerthreadcount;
        unsigned int i;
        int itmp;
        void (*exit_callback)(const struct mg_context *ctx) = 0;
@@ -20742,7 +20797,12 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
 #endif
 
        /* Worker thread count option */
-       workerthreadcount = atoi(ctx->dd.config[NUM_THREADS]);
+       workerthreadcount   = atoi(ctx->dd.config[NUM_THREADS]);
+       prespawnthreadcount = atoi(ctx->dd.config[PRESPAWN_THREADS]);
+
+       if ((prespawnthreadcount < 0)||(prespawnthreadcount > workerthreadcount)) {
+               prespawnthreadcount = workerthreadcount;  /* can't prespawn more than all of them! */
+       }
 
        if ((workerthreadcount > MAX_WORKER_THREADS) || (workerthreadcount <= 0)) {
                if (workerthreadcount <= 0) {
@@ -21007,8 +21067,8 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
                return NULL;
        }
 
-       ctx->cfg_worker_threads = ((unsigned int)(workerthreadcount));
-       ctx->worker_threadids = (pthread_t *)mg_calloc_ctx(ctx->cfg_worker_threads,
+       ctx->cfg_max_worker_threads = ((unsigned int)(workerthreadcount));
+       ctx->worker_threadids = (pthread_t *)mg_calloc_ctx(ctx->cfg_max_worker_threads,
                                                           sizeof(pthread_t),
                                                           ctx);
 
@@ -21019,7 +21079,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
                if (error != NULL) {
                        error->code = MG_ERROR_DATA_CODE_OUT_OF_MEMORY;
                        error->code_sub =
-                           (unsigned)ctx->cfg_worker_threads * (unsigned)sizeof(pthread_t);
+                           (unsigned)ctx->cfg_max_worker_threads * (unsigned)sizeof(pthread_t);
                        mg_snprintf(NULL,
                                    NULL, /* No truncation check for error buffers */
                                    error->text,
@@ -21033,7 +21093,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
                return NULL;
        }
        ctx->worker_connections =
-           (struct mg_connection *)mg_calloc_ctx(ctx->cfg_worker_threads,
+           (struct mg_connection *)mg_calloc_ctx(ctx->cfg_max_worker_threads,
                                                  sizeof(struct mg_connection),
                                                  ctx);
        if (ctx->worker_connections == NULL) {
@@ -21043,7 +21103,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
 
                if (error != NULL) {
                        error->code = MG_ERROR_DATA_CODE_OUT_OF_MEMORY;
-                       error->code_sub = (unsigned)ctx->cfg_worker_threads
+                       error->code_sub = (unsigned)ctx->cfg_max_worker_threads
                                          * (unsigned)sizeof(struct mg_connection);
                        mg_snprintf(NULL,
                                    NULL, /* No truncation check for error buffers */
@@ -21060,7 +21120,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
 
 #if defined(ALTERNATIVE_QUEUE)
        ctx->client_wait_events =
-           (void **)mg_calloc_ctx(ctx->cfg_worker_threads,
+           (void **)mg_calloc_ctx(ctx->cfg_max_worker_threads,
                                   sizeof(ctx->client_wait_events[0]),
                                   ctx);
        if (ctx->client_wait_events == NULL) {
@@ -21070,7 +21130,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
 
                if (error != NULL) {
                        error->code = MG_ERROR_DATA_CODE_OUT_OF_MEMORY;
-                       error->code_sub = (unsigned)ctx->cfg_worker_threads
+                       error->code_sub = (unsigned)ctx->cfg_max_worker_threads
                                          * (unsigned)sizeof(ctx->client_wait_events[0]);
                        mg_snprintf(NULL,
                                    NULL, /* No truncation check for error buffers */
@@ -21086,7 +21146,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
        }
 
        ctx->client_socks =
-           (struct socket *)mg_calloc_ctx(ctx->cfg_worker_threads,
+           (struct socket *)mg_calloc_ctx(ctx->cfg_max_worker_threads,
                                           sizeof(ctx->client_socks[0]),
                                           ctx);
        if (ctx->client_socks == NULL) {
@@ -21097,7 +21157,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
 
                if (error != NULL) {
                        error->code = MG_ERROR_DATA_CODE_OUT_OF_MEMORY;
-                       error->code_sub = (unsigned)ctx->cfg_worker_threads
+                       error->code_sub = (unsigned)ctx->cfg_max_worker_threads
                                          * (unsigned)sizeof(ctx->client_socks[0]);
                        mg_snprintf(NULL,
                                    NULL, /* No truncation check for error buffers */
@@ -21112,7 +21172,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
                return NULL;
        }
 
-       for (i = 0; (unsigned)i < ctx->cfg_worker_threads; i++) {
+       for (i = 0; (unsigned)i < ctx->cfg_max_worker_threads; i++) {
                ctx->client_wait_events[i] = event_create();
                if (ctx->client_wait_events[i] == 0) {
                        const char *err_msg = "Error creating worker event %i";
@@ -21176,23 +21236,18 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
        ctx->context_type = CONTEXT_SERVER; /* server context */
 
        /* Start worker threads */
-       for (i = 0; i < ctx->cfg_worker_threads; i++) {
+       for (int i = 0; i < prespawnthreadcount; i++) {
                /* worker_thread sets up the other fields */
-               ctx->worker_connections[i].phys_ctx = ctx;
-               if (mg_start_thread_with_id(worker_thread,
-                                           &ctx->worker_connections[i],
-                                           &ctx->worker_threadids[i])
-                   != 0) {
-
+               if (mg_start_worker_thread(ctx, 0) != 0) {
                        long error_no = (long)ERRNO;
 
                        /* thread was not created */
-                       if (i > 0) {
+                       if (ctx->spawned_worker_threads > 0) {
                                /* If the second, third, ... thread cannot be created, set a
                                 * warning, but keep running. */
                                mg_cry_ctx_internal(ctx,
                                                    "Cannot start worker thread %i: error %ld",
-                                                   i + 1,
+                                                   ctx->spawned_worker_threads + 1,
                                                    error_no);
 
                                /* If the server initialization should stop here, all
@@ -22122,7 +22177,7 @@ mg_get_connection_info(const struct mg_context *ctx,
                return 0;
        }
 
-       if ((unsigned)idx >= ctx->cfg_worker_threads) {
+       if ((unsigned)idx >= ctx->cfg_max_worker_threads) {
                /* Out of range */
                return 0;
        }
index 2453799873a6a98d32b2d977f616805d92230372..6e4f62f96fd9d7006b396a9ae503ae8831fec031 100644 (file)
@@ -22,6 +22,7 @@ opts = [
 "fallback_document_root",
 "ssl_certificate",
 "num_threads",
+"prespawn_threads",
 "run_as_user",
 "url_rewrite_patterns",
 "hide_files_patterns",
index 8ea354526be182829c44d73ab5bfddd1bae28ca9..f9d6015ce168955daf92062006044c34d6fa240d 100644 (file)
@@ -1626,6 +1626,7 @@ START_TEST(test_config_options)
        ck_assert_str_eq("ssl_certificate_chain",
                         config_options[SSL_CERTIFICATE_CHAIN].name);
        ck_assert_str_eq("num_threads", config_options[NUM_THREADS].name);
+       ck_assert_str_eq("prespawn_threads", config_options[PRESPAWN_THREADS].name);
        ck_assert_str_eq("run_as_user", config_options[RUN_AS_USER].name);
        ck_assert_str_eq("url_rewrite_patterns",
                         config_options[URL_REWRITE_PATTERN].name);