diff --git a/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c b/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c index fe6e46fb657..980c227ff93 100644 --- a/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c +++ b/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c @@ -34,7 +34,6 @@ static int in_elasticsearch_bulk_conn_event(void *data) ssize_t available; ssize_t bytes; char *tmp; - char *request_end; size_t request_len; struct flb_connection *connection; struct in_elasticsearch_bulk_conn *conn; @@ -98,47 +97,45 @@ static int in_elasticsearch_bulk_conn_event(void *data) /* Do more logic parsing and checks for this request */ in_elasticsearch_bulk_prot_handle(ctx, conn, &conn->session, &conn->request); - /* Evict the processed request from the connection buffer and reinitialize + /* + * Evict the processed request from the connection buffer and reinitialize * the HTTP parser. */ - request_end = NULL; + /* Use the last parser position as the request length */ + request_len = mk_http_parser_request_size(&conn->session.parser, + conn->buf_data, + conn->buf_len); - if (NULL != conn->request.data.data) { - request_end = &conn->request.data.data[conn->request.data.len]; + if (request_len == -1 || (request_len > conn->buf_len)) { + /* Unexpected but let's make sure things are safe */ + conn->buf_len = 0; + flb_plg_debug(ctx->ins, "request length exceeds buffer length, closing connection"); + in_elasticsearch_bulk_conn_del(conn); + return -1; } - else { - request_end = strstr(conn->buf_data, "\r\n\r\n"); - if(NULL != request_end) { - request_end = &request_end[4]; - } - } + /* If we have extra bytes in our bytes, adjust the extra bytes */ + if (0 < (conn->buf_len - request_len)) { + memmove(conn->buf_data, &conn->buf_data[request_len], + conn->buf_len - request_len); - if (NULL != request_end) { - request_len = (size_t)(request_end - conn->buf_data); - - if (0 < (conn->buf_len - request_len)) { - memmove(conn->buf_data, &conn->buf_data[request_len], - conn->buf_len - request_len); - - conn->buf_data[conn->buf_len - request_len] = '\0'; - conn->buf_len -= request_len; - } - else { - memset(conn->buf_data, 0, request_len); - - conn->buf_len = 0; - } - - /* Reinitialize the parser so the next request is properly - * handled, the additional memset intends to wipe any left over data - * from the headers parsed in the previous request. - */ - memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); - mk_http_parser_init(&conn->session.parser); - in_elasticsearch_bulk_conn_request_init(&conn->session, &conn->request); + conn->buf_data[conn->buf_len - request_len] = '\0'; + conn->buf_len -= request_len; } + else { + memset(conn->buf_data, 0, request_len); + conn->buf_len = 0; + } + + /* + * Reinitialize the parser so the next request is properly + * handled, the additional memset intends to wipe any left over data + * from the headers parsed in the previous request. + */ + memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); + mk_http_parser_init(&conn->session.parser); + in_elasticsearch_bulk_conn_request_init(&conn->session, &conn->request); } else if (status == MK_HTTP_PARSER_ERROR) { in_elasticsearch_bulk_prot_handle_error(ctx, conn, &conn->session, &conn->request); diff --git a/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c b/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c index c705af60d81..a2424413e19 100644 --- a/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c +++ b/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c @@ -621,6 +621,10 @@ static int process_payload(struct flb_in_elasticsearch *ctx, struct in_elasticse int gzip_compressed = FLB_FALSE; void *gz_data = NULL; size_t gz_size = -1; + char *out_chunked = NULL; + size_t out_chunked_size = 0; + char *payload_buf; + size_t payload_size; header = &session->parser.headers[MK_HEADER_CONTENT_TYPE]; if (header->key.data == NULL) { @@ -643,7 +647,7 @@ static int process_payload(struct flb_in_elasticsearch *ctx, struct in_elasticse return -1; } - if (request->data.len <= 0) { + if (request->data.len <= 0 && !mk_http_parser_is_content_chunked(&session->parser)) { send_response(conn, 400, "error: no payload found\n"); return -1; } @@ -664,8 +668,32 @@ static int process_payload(struct flb_in_elasticsearch *ctx, struct in_elasticse } if (type == HTTP_CONTENT_NDJSON || type == HTTP_CONTENT_JSON) { + /* Check if the data is chunked */ + payload_buf = NULL; + payload_size = 0; + + if (mk_http_parser_is_content_chunked(&session->parser)) { + ret = mk_http_parser_chunked_decode(&session->parser, + conn->buf_data, + conn->buf_len, + &out_chunked, + &out_chunked_size); + + if (ret == -1) { + send_response(conn, 400, "error: invalid chunked data\n"); + return -1; + } + + payload_buf = out_chunked; + payload_size = out_chunked_size; + } + else { + payload_buf = request->data.data; + payload_size = request->data.len; + } + if (gzip_compressed == FLB_TRUE) { - ret = flb_gzip_uncompress((void *) request->data.data, request->data.len, + ret = flb_gzip_uncompress((void *) payload_buf, payload_size, &gz_data, &gz_size); if (ret == -1) { flb_error("[elasticsearch_bulk_prot] gzip uncompress is failed"); @@ -675,10 +703,15 @@ static int process_payload(struct flb_in_elasticsearch *ctx, struct in_elasticse flb_free(gz_data); } else { - parse_payload_ndjson(ctx, tag, request->data.data, request->data.len, bulk_statuses); + parse_payload_ndjson(ctx, tag, payload_buf, payload_size, bulk_statuses); } } + /* release chunked data if has been set */ + if (out_chunked) { + mk_mem_free(out_chunked); + } + return 0; } @@ -856,7 +889,8 @@ int in_elasticsearch_bulk_prot_handle(struct flb_in_elasticsearch *ctx, mk_mem_free(uri); return -1; } - } else { + } + else { flb_sds_destroy(tag); mk_mem_free(uri); @@ -1056,6 +1090,7 @@ static int process_payload_ng(struct flb_http_request *request, return -1; } + printf("Processing payload 2 : %s\n", request->body); parse_payload_ndjson(context, tag, request->body, cfl_sds_len(request->body), bulk_statuses); return 0;