static ngx_int_t ngx_http_proxy_v2_filter_init(void *data);
static ngx_int_t ngx_http_proxy_v2_non_buffered_filter(void *data,
ssize_t bytes);
+static ngx_int_t ngx_http_proxy_v2_body_filter(ngx_event_pipe_t *p,
+ ngx_buf_t *buf);
static ngx_int_t ngx_http_proxy_v2_process_control_frame(ngx_http_request_t *r,
ngx_http_proxy_v2_ctx_t *ctx, ngx_buf_t *b);
static ngx_int_t ngx_http_proxy_v2_skip_frame(ngx_http_proxy_v2_ctx_t *ctx,
u->rewrite_cookie = ngx_http_proxy_rewrite_cookie;
}
+ u->buffering = plcf->upstream.buffering;
+
+ u->pipe = ngx_pcalloc(r->pool, sizeof(ngx_event_pipe_t));
+ if (u->pipe == NULL) {
+ return NGX_HTTP_INTERNAL_SERVER_ERROR;
+ }
+
+ u->pipe->input_filter = ngx_http_proxy_v2_body_filter;
+ u->pipe->input_ctx = r;
+
u->input_filter_init = ngx_http_proxy_v2_filter_init;
u->input_filter = ngx_http_proxy_v2_non_buffered_filter;
u->input_filter_ctx = r;
u = r->upstream;
u->length = 0;
+ u->pipe->length = 0;
if (ctx->in == NULL
&& ctx->out == NULL
}
u->length = 0;
+ u->pipe->length = 0;
ctx->done = 1;
} else {
u->length = 1;
+ u->pipe->length = 1;
}
return NGX_OK;
}
+static ngx_int_t
+ngx_http_proxy_v2_body_filter(ngx_event_pipe_t *p, ngx_buf_t *b)
+{
+ ngx_int_t rc;
+ ngx_buf_t *buf, **prev;
+ ngx_chain_t *cl;
+ ngx_http_request_t *r;
+ ngx_http_proxy_v2_ctx_t *ctx;
+
+ if (b->pos == b->last) {
+ return NGX_OK;
+ }
+
+ r = p->input_ctx;
+ ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_v2_module);
+
+ if (ctx == NULL) {
+ return NGX_ERROR;
+ }
+
+ buf = NULL;
+ prev = &b->shadow;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "http proxy filter bytes:%z", b->last - b->pos);
+
+ for ( ;; ) {
+
+ rc = ngx_http_proxy_v2_process_frames(r, ctx, b);
+
+ if (rc == NGX_OK) {
+
+ /* copy data frame payload for buffering */
+
+ cl = ngx_chain_get_free_buf(p->pool, &p->free);
+ if (cl == NULL) {
+ return NGX_ERROR;
+ }
+
+ buf = cl->buf;
+
+ ngx_memzero(buf, sizeof(ngx_buf_t));
+
+ buf->pos = b->pos;
+ buf->start = b->start;
+ buf->end = b->end;
+ buf->tag = p->tag;
+ buf->temporary = 1;
+ buf->recycled = 1;
+
+ *prev = buf;
+ prev = &buf->shadow;
+
+ if (p->in) {
+ *p->last_in = cl;
+
+ } else {
+ p->in = cl;
+ }
+
+ p->last_in = &cl->next;
+
+ /* STUB */ buf->num = b->num;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "http proxy copy buf %p", buf->pos);
+
+ if (b->last - b->pos >= (ssize_t) ctx->rest - ctx->padding) {
+ b->pos += ctx->rest - ctx->padding;
+ buf->last = b->pos;
+ ctx->rest = ctx->padding;
+
+ } else {
+ ctx->rest -= b->last - b->pos;
+ b->pos = b->last;
+ buf->last = b->pos;
+ }
+
+ if (ctx->length != -1) {
+
+ if (buf->last - buf->pos > ctx->length) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "upstream sent response body larger "
+ "than indicated content length");
+ return NGX_ERROR;
+ }
+
+ ctx->length -= buf->last - buf->pos;
+ }
+
+ continue;
+ }
+
+ if (rc == NGX_DONE) {
+ p->length = 0;
+ break;
+ }
+
+ if (rc == NGX_AGAIN) {
+ break;
+ }
+
+ /* invalid response */
+
+ return NGX_ERROR;
+ }
+
+ if (buf) {
+ buf->shadow = b;
+ buf->last_shadow = 1;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
+ "input buf %p %z", buf->pos, buf->last - buf->pos);
+
+ return NGX_OK;
+ }
+
+ /* there is no data record in the buf, add it to free chain */
+
+ if (ngx_event_pipe_add_free_buf(p, b) != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ return NGX_OK;
+}
+
+
static ngx_int_t
ngx_http_proxy_v2_process_control_frame(ngx_http_request_t *r,
ngx_http_proxy_v2_ctx_t *ctx, ngx_buf_t *b)