Skip to content

Commit

Permalink
HTTP client fixes (#2163)
Browse files Browse the repository at this point in the history
- Improvements to pipelining.
- Improvements to retrying the current request(as is the case for URLs protected with digest authentication).
- Fixed post requests sending files and/or form data.
- Small improvements to the handing of failed writes in TcpConnection.
  • Loading branch information
mikee47 authored Dec 4, 2020
1 parent ac63b94 commit da225ba
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 36 deletions.
10 changes: 7 additions & 3 deletions Sming/Core/Data/Stream/MultipartStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

IDataSourceStream* MultipartStream::getNextStream()
{
if(footerSent) {
return nullptr;
}

// Return content, if available
if(bodyPart.stream != nullptr) {
auto stream = bodyPart.stream;
Expand All @@ -31,13 +35,13 @@ IDataSourceStream* MultipartStream::getNextStream()
stream->print("\r\n");
stream->print("--");
stream->print(getBoundary());
if(bodyPart.headers == nullptr) {
// No more parts
if(!bodyPart) {
stream->print("--");
footerSent = true;
}
stream->print("\r\n");

if(bodyPart.headers != nullptr) {
if(bodyPart) {
if(bodyPart.stream != nullptr && !bodyPart.headers->contains(HTTP_HEADER_CONTENT_LENGTH)) {
auto avail = bodyPart.stream->available();
if(avail >= 0) {
Expand Down
13 changes: 10 additions & 3 deletions Sming/Core/Data/Stream/MultipartStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,14 @@ class MultipartStream : public MultiStream
* @brief Each result item contains a set of headers plus content stream
*/
struct BodyPart {
HttpHeaders* headers = nullptr;
IDataSourceStream* stream = nullptr;
HttpHeaders* headers{nullptr};
IDataSourceStream* stream{nullptr};

// Must always have headers, stream is optional (can be empty)
explicit operator bool() const
{
return headers != nullptr;
}
};

/**
Expand Down Expand Up @@ -59,7 +65,8 @@ class MultipartStream : public MultiStream
private:
Producer producer;
BodyPart bodyPart;
char boundary[16] = {0};
char boundary[16]{};
bool footerSent{false};
};

/**
Expand Down
45 changes: 25 additions & 20 deletions Sming/Core/Network/Http/HttpClientConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ bool HttpClientConnection::send(HttpRequest* request)

void HttpClientConnection::reset()
{
delete incomingRequest;
incomingRequest = nullptr;

response.reset();
Expand All @@ -64,7 +63,7 @@ void HttpClientConnection::reset()

int HttpClientConnection::onMessageBegin(http_parser* parser)
{
incomingRequest = executionQueue.dequeue();
incomingRequest = executionQueue.peek();
if(incomingRequest == nullptr) {
return 1; // there are no requests in the queue
}
Expand All @@ -77,7 +76,7 @@ MultipartStream::BodyPart HttpClientConnection::multipartProducer()
MultipartStream::BodyPart result;

if(outgoingRequest->files.count()) {
const String& name = outgoingRequest->files.keyAt(0);
String name = outgoingRequest->files.keyAt(0);
auto file = outgoingRequest->files.extractAt(0);
result.stream = file;

Expand Down Expand Up @@ -113,7 +112,7 @@ int HttpClientConnection::onMessageComplete(http_parser* parser)
return -2; // no current request...
}

debug_d("HCC::onMessageComplete: Execution queue: %d, %s", executionQueue.count(),
debug_d("HCC::onMessageComplete: executionQueue: %d, %s", executionQueue.count(),
incomingRequest->uri.toString().c_str());

// we are finished with this request
Expand All @@ -126,23 +125,32 @@ int HttpClientConnection::onMessageComplete(http_parser* parser)

if(incomingRequest->retries > 0) {
incomingRequest->retries--;
return (executionQueue.enqueue(incomingRequest) ? 0 : -1);
return 0;
}

executionQueue.dequeue();

delete incomingRequest;
incomingRequest = nullptr;

state = eHCS_Ready;

auto response = getResponse();

if(response->headers.contains(HTTP_HEADER_CONNECTION) &&
response->headers[HTTP_HEADER_CONNECTION].equalsIgnoreCase(_F("close"))) {
const String& headerConnection = static_cast<const HttpHeaders&>(response->headers)[HTTP_HEADER_CONNECTION];
if(headerConnection.equalsIgnoreCase(_F("close"))) {
allowPipe = false;
// if the server does not support keep-alive -> close the connection
// see: https://tools.ietf.org/html/rfc2616#section-14.10
debug_d("HCC::onMessageComplete: Closing as requested by server");
close();
} else if(executionQueue.count() == 0) {

return hasError;
}

allowPipe = true; // if the server supports keep-alive then it would most probably support also pipelining...

if(executionQueue.count() == 0) {
onConnected(ERR_OK);
}

Expand Down Expand Up @@ -225,7 +233,7 @@ int HttpClientConnection::onBody(const char* at, size_t length)

void HttpClientConnection::onReadyToSendData(TcpConnectionEvent sourceEvent)
{
debug_d("HCC::onReadyToSendData: executionQueue: %d, waitingQueue.count: %d", executionQueue.count(),
debug_d("HCC::onReadyToSendData: State: %d, executionQueue: %d, waitingQueue: %d", state, executionQueue.count(),
waitingQueue.count());

REENTER:
Expand All @@ -239,8 +247,8 @@ void HttpClientConnection::onReadyToSendData(TcpConnectionEvent sourceEvent)
}

// if the executionQueue is not empty then we have to check if we can pipeline that request
if(executionQueue.count()) {
if(!(request->method == HTTP_GET || request->method == HTTP_HEAD)) {
if(executionQueue.count() != 0) {
if(!(allowPipe && (request->method == HTTP_GET || request->method == HTTP_HEAD))) {
// if the current request cannot be pipelined -> break;
break;
}
Expand Down Expand Up @@ -302,17 +310,14 @@ void HttpClientConnection::onClosed()
{
if(waitingQueue.count() + executionQueue.count() > 0) {
debug_d("HCC::onClosed: Trying to reconnect and send pending requests");
reset();
init(HTTP_RESPONSE);

HttpRequest* request = nullptr;
if(executionQueue.count() > 0) {
request = executionQueue.peek();
} else {
request = waitingQueue.peek();
cleanup();
init(HTTP_RESPONSE);
auto request = waitingQueue.peek();
if(request != nullptr) {
bool useSsl = (request->uri.Scheme == URI_SCHEME_HTTP_SECURE);
connect(request->uri.Host, request->uri.getPort(), useSsl);
}
bool useSsl = (request->uri.Scheme == URI_SCHEME_HTTP_SECURE);
connect(request->uri.Host, request->uri.getPort(), useSsl);
}
}

Expand Down
11 changes: 11 additions & 0 deletions Sming/Core/Network/Http/HttpClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ class HttpClientConnection : public HttpConnection
}
}

err_t onConnected(err_t err) override
{
if(err == ERR_OK) {
state = eHCS_Ready;
}

return HttpConnection::onConnected(err);
}

private:
void sendRequestHeaders(HttpRequest* request);
bool sendRequestBody(HttpRequest* request);
Expand All @@ -93,6 +102,8 @@ class HttpClientConnection : public HttpConnection

HttpRequest* incomingRequest = nullptr;
HttpRequest* outgoingRequest = nullptr;

bool allowPipe = false; /// < Flag to specify if HTTP pipelining is allowed for this connection
};

/** @} */
1 change: 1 addition & 0 deletions Sming/Core/Network/Http/HttpConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ void HttpConnection::init(http_parser_type type)
http_parser_init(&parser, type);
parser.data = this;
setDefaultParser();
state = eHCS_Ready;
}

void HttpConnection::setDefaultParser()
Expand Down
2 changes: 1 addition & 1 deletion Sming/Core/Network/Http/HttpRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class HttpRequest
/* @deprecated Use methods of `uri.Query` instead */
String getQueryParameter(const String& parameterName, const String& defaultValue = nullptr) const
{
return reinterpret_cast<const HttpParams&>(uri.Query)[parameterName] ?: defaultValue;
return static_cast<const HttpParams&>(uri.Query)[parameterName] ?: defaultValue;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion Sming/Core/Network/MqttClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ int MqttClient::onMessageEnd(mqtt_message_t* message)
setBits(flags, MQTT_CLIENT_CONNECTED);
}

auto& handler = reinterpret_cast<const HandlerMap&>(eventHandlers)[message->common.type];
auto& handler = static_cast<const HandlerMap&>(eventHandlers)[message->common.type];
if(handler) {
return handler(*this, message);
}
Expand Down
13 changes: 8 additions & 5 deletions Sming/Core/Network/TcpConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,17 @@ int TcpConnection::write(IDataSourceStream* stream)
int bytesWritten = write(buffer, bytesRead, TCP_WRITE_FLAG_COPY | TCP_WRITE_FLAG_MORE);
debug_tcp_d("Written: %d, Available: %u, isFinished: %d, PushCount: %u", bytesWritten, available,
stream->isFinished(), pushCount);
if(bytesWritten <= 0) {
continue;

if(bytesWritten < 0) {
break;
}

if(bytesWritten > 0) {
total += size_t(bytesWritten);
stream->seek(bytesWritten);
if(bytesWritten == 0) {
continue;
}

total += size_t(bytesWritten);
stream->seek(bytesWritten);
}

if(pushCount == 0) {
Expand Down
19 changes: 16 additions & 3 deletions Sming/Core/Network/TcpConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,21 @@ class TcpConnection : public IpConnection
virtual bool connect(IpAddress addr, uint16_t port, bool useSsl = false);
virtual void close();

// return -1 on error
/** @brief Writes string data directly to the TCP buffer
* @param data null terminated string
* @param apiflags TCP_WRITE_FLAG_COPY, TCP_WRITE_FLAG_MORE
* @retval int negative on error, 0 when retry is needed or possitive on success
*/
int writeString(const char* data, uint8_t apiflags = TCP_WRITE_FLAG_COPY)
{
return write(data, strlen(data), apiflags);
}

// return -1 on error
/** @brief Writes string data directly to the TCP buffer
* @param data
* @param apiflags TCP_WRITE_FLAG_COPY, TCP_WRITE_FLAG_MORE
* @retval int negative on error, 0 when retry is needed or possitive on success
*/
int writeString(const String& data, uint8_t apiflags = TCP_WRITE_FLAG_COPY)
{
return write(data.c_str(), data.length(), apiflags);
Expand All @@ -71,10 +79,15 @@ class TcpConnection : public IpConnection
* @param data
* @param len
* @param apiflags TCP_WRITE_FLAG_COPY, TCP_WRITE_FLAG_MORE
* @retval int -1 on error
* @retval int negative on error, 0 when retry is needed or possitive on success
*/
virtual int write(const char* data, int len, uint8_t apiflags = TCP_WRITE_FLAG_COPY);

/** @brief Writes stream data directly to the TCP buffer
* @param stream
* @param apiflags TCP_WRITE_FLAG_COPY, TCP_WRITE_FLAG_MORE
* @retval int negative on error, 0 when retry is needed or possitive on success
*/
int write(IDataSourceStream* stream);

uint16_t getAvailableWriteSize()
Expand Down

0 comments on commit da225ba

Please sign in to comment.