Skip to content

Commit

Permalink
Refactored the HttpClient and Connection to reuse code.
Browse files Browse the repository at this point in the history
Refactored the WebsocketConnection to be reused in server and client mode.
  • Loading branch information
slav-at-attachix committed Nov 2, 2018
1 parent e689c51 commit bde7db7
Show file tree
Hide file tree
Showing 17 changed files with 764 additions and 715 deletions.
246 changes: 40 additions & 206 deletions Sming/SmingCore/Network/Http/HttpConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,9 @@
#include "lwip/tcp_impl.h"
#endif

bool HttpConnection::parserSettingsInitialized = false;
http_parser_settings HttpConnection::parserSettings;

HttpConnection::HttpConnection(RequestQueue* queue) : TcpClient(false)
HttpConnection::HttpConnection(RequestQueue* queue) : HttpConnectionBase(HTTP_RESPONSE)
{
this->waitingQueue = queue;

http_parser_init(&parser, HTTP_RESPONSE);
parser.data = (void*)this;

if(!parserSettingsInitialized) {
memset(&parserSettings, 0, sizeof(parserSettings));

// Notification callbacks: on_message_begin, on_headers_complete, on_message_complete.
parserSettings.on_message_begin = staticOnMessageBegin;
parserSettings.on_headers_complete = staticOnHeadersComplete;
parserSettings.on_message_complete = staticOnMessageComplete;

#ifndef COMPACT_MODE
parserSettings.on_chunk_header = staticOnChunkHeader;
parserSettings.on_chunk_complete = staticOnChunkComplete;
#endif

// Data callbacks: on_url, (common) on_header_field, on_header_value, on_body;
#ifndef COMPACT_MODE
parserSettings.on_status = staticOnStatus;
#endif
parserSettings.on_header_field = staticOnHeaderField;
parserSettings.on_header_value = staticOnHeaderValue;
parserSettings.on_body = staticOnBody;

parserSettingsInitialized = true;
}
}

bool HttpConnection::connect(const String& host, int port, bool useSsl /* = false */, uint32_t sslOptions /* = 0 */)
Expand Down Expand Up @@ -151,30 +121,13 @@ void HttpConnection::reset()

response.reset();

lastWasValue = true;
lastData = "";
currentField = "";
state = eHCS_Ready;
HttpConnectionBase::reset();
}

err_t HttpConnection::onProtocolUpgrade(http_parser* parser)
int HttpConnection::onMessageBegin(http_parser* parser)
{
debug_w("onProtocolUpgrade: Protocol upgrade is not supported");
return ERR_ABRT;
}

int HttpConnection::staticOnMessageBegin(http_parser* parser)
{
HttpConnection* connection = (HttpConnection*)parser->data;
if(connection == nullptr) {
// something went wrong
return -1;
}

connection->reset();

connection->incomingRequest = connection->executionQueue.dequeue();
if(connection->incomingRequest == nullptr) {
incomingRequest = executionQueue.dequeue();
if(incomingRequest == nullptr) {
return 1; // there are no requests in the queue
}

Expand Down Expand Up @@ -219,55 +172,40 @@ HttpPartResult HttpConnection::multipartProducer()
return result;
}

int HttpConnection::staticOnMessageComplete(http_parser* parser)
int HttpConnection::onMessageComplete(http_parser* parser)
{
HttpConnection* connection = (HttpConnection*)parser->data;
if(connection == nullptr) {
// something went wrong
return -1;
}

if(!connection->incomingRequest) {
if(!incomingRequest) {
return -2; // no current request...
}

debug_d("staticOnMessageComplete: Execution queue: %d, %s", connection->executionQueue.count(),
connection->incomingRequest->uri.toString().c_str());
debug_d("staticOnMessageComplete: Execution queue: %d, %s", executionQueue.count(),
incomingRequest->uri.toString().c_str());

// we are finished with this request
int hasError = 0;
if(connection->incomingRequest->requestCompletedDelegate) {
bool success = (HTTP_PARSER_ERRNO(parser) == HPE_OK) && // false when the parsing has failed
(connection->response.code >= 200 &&
connection->response.code <= 399); // false when the HTTP status code is not ok
hasError = connection->incomingRequest->requestCompletedDelegate(*connection, success);
if(incomingRequest->requestCompletedDelegate) {
bool success = (HTTP_PARSER_ERRNO(parser) == HPE_OK) && // false when the parsing has failed
(response.code >= 200 && response.code <= 399); // false when the HTTP status code is not ok
hasError = incomingRequest->requestCompletedDelegate(*this, success);
}

if(connection->incomingRequest->retries > 0) {
connection->incomingRequest->retries--;
return (connection->executionQueue.enqueue(connection->incomingRequest) ? 0 : -1);
if(incomingRequest->retries > 0) {
incomingRequest->retries--;
return (executionQueue.enqueue(incomingRequest) ? 0 : -1);
}

delete connection->incomingRequest;
connection->incomingRequest = nullptr;
delete incomingRequest;
incomingRequest = nullptr;

if(!connection->executionQueue.count()) {
connection->onConnected(ERR_OK);
if(!executionQueue.count()) {
onConnected(ERR_OK);
}

return hasError;
}

int HttpConnection::staticOnHeadersComplete(http_parser* parser)
int HttpConnection::onHeadersComplete(const HttpHeaders& headers)
{
HttpConnection* connection = (HttpConnection*)parser->data;
if(connection == nullptr) {
// something went wrong
return -1;
}

debug_d("The headers are complete");

/* Callbacks should return non-zero to indicate an error. The parser will
* then halt execution.
*
Expand All @@ -283,121 +221,59 @@ int HttpConnection::staticOnHeadersComplete(http_parser* parser)
* `Upgrade` or `Connection: upgrade` headers.
*/

if(connection->incomingRequest == nullptr) {
if(incomingRequest == nullptr) {
// nothing to process right now...
return 1;
}

connection->response.code = parser->status_code;
response.headers.setMultiple(headers);
response.code = parser.status_code;

if(connection->incomingRequest->auth != nullptr) {
connection->incomingRequest->auth->setResponse(connection->getResponse());
if(incomingRequest->auth != nullptr) {
incomingRequest->auth->setResponse(getResponse());
}

int error = 0;
if(connection->incomingRequest->headersCompletedDelegate) {
error = connection->incomingRequest->headersCompletedDelegate(*connection, connection->response);
if(incomingRequest->headersCompletedDelegate) {
error = incomingRequest->headersCompletedDelegate(*this, response);
}

if(!error && connection->incomingRequest->method == HTTP_HEAD) {
if(!error && incomingRequest->method == HTTP_HEAD) {
error = 1;
}

if(!error) {
// set the response stream
if(connection->incomingRequest->responseStream != nullptr) {
connection->response.stream = connection->incomingRequest->responseStream;
connection->incomingRequest->responseStream = nullptr; // the response object will release that stream
if(incomingRequest->responseStream != nullptr) {
response.stream = incomingRequest->responseStream;
incomingRequest->responseStream = nullptr; // the response object will release that stream
} else {
connection->response.stream = new LimitedMemoryStream(NETWORK_SEND_BUFFER_SIZE);
response.stream = new LimitedMemoryStream(NETWORK_SEND_BUFFER_SIZE);
}
}

return error;
}

#ifndef COMPACT_MODE
int HttpConnection::staticOnStatus(http_parser* parser, const char* at, size_t length)
{
return 0;
}
#endif

int HttpConnection::staticOnHeaderField(http_parser* parser, const char* at, size_t length)
{
HttpConnection* connection = (HttpConnection*)parser->data;
if(connection == nullptr) {
// something went wrong
return -1;
}

if(connection->lastWasValue) {
// we are starting to process new header
connection->lastData = "";
connection->lastWasValue = false;
}
connection->lastData += String(at, length);

return 0;
}

int HttpConnection::staticOnHeaderValue(http_parser* parser, const char* at, size_t length)
int HttpConnection::onBody(const char* at, size_t length)
{
HttpConnection* connection = (HttpConnection*)parser->data;
if(connection == nullptr) {
// something went wrong
return -1;
if(incomingRequest->requestBodyDelegate) {
return incomingRequest->requestBodyDelegate(*this, at, length);
}

if(!connection->lastWasValue) {
connection->currentField = connection->lastData;
connection->response.headers[connection->currentField] = nullptr;
connection->lastWasValue = true;
}
connection->response.headers[connection->currentField] += String(at, length);

return 0;
}

int HttpConnection::staticOnBody(http_parser* parser, const char* at, size_t length)
{
HttpConnection* connection = (HttpConnection*)parser->data;
if(connection == nullptr) {
// something went wrong
return -1;
}

if(connection->incomingRequest->requestBodyDelegate) {
return connection->incomingRequest->requestBodyDelegate(*connection, at, length);
}

if(connection->response.stream != nullptr) {
int res = connection->response.stream->write((const uint8_t*)at, length);
if(response.stream != nullptr) {
int res = response.stream->write((const uint8_t*)at, length);
if(res != length) {
// unable to write the requested bytes - stop here...
delete connection->response.stream;
connection->response.stream = nullptr;
delete response.stream;
response.stream = nullptr;
return 1;
}
}

return 0;
}

#ifndef COMPACT_MODE
int HttpConnection::staticOnChunkHeader(http_parser* parser)
{
debug_d("On chunk header");
return 0;
}

int HttpConnection::staticOnChunkComplete(http_parser* parser)
{
debug_d("On chunk complete");
return 0;
}
#endif

void HttpConnection::onReadyToSendData(TcpConnectionEvent sourceEvent)
{
debug_d("HttpConnection::onReadyToSendData: waitingQueue.count: %d", waitingQueue->count());
Expand Down Expand Up @@ -556,48 +432,6 @@ HttpResponse* HttpConnection::getResponse()

// end of public methods for HttpConnection

err_t HttpConnection::onReceive(pbuf* buf)
{
if(buf == nullptr) {
// Disconnected, close it
return TcpClient::onReceive(buf);
}

pbuf* cur = buf;
int parsedBytes = 0;
while(cur != nullptr && cur->len > 0) {
parsedBytes += http_parser_execute(&parser, &parserSettings, (char*)cur->payload, cur->len);
if(HTTP_PARSER_ERRNO(&parser) != HPE_OK) {
// we ran into trouble - abort the connection
debug_e("HTTP parser error: %s", httpGetErrorName(HTTP_PARSER_ERRNO(&parser)).c_str());
cleanup();
TcpConnection::onReceive(nullptr);
return ERR_ABRT;
}

cur = cur->next;
}

if(parser.upgrade) {
return onProtocolUpgrade(&parser);
} else if(parsedBytes != buf->tot_len) {
TcpClient::onReceive(nullptr);

return ERR_ABRT;
}

// Fire ReadyToSend callback
TcpClient::onReceive(buf);

return ERR_OK;
}

void HttpConnection::onError(err_t err)
{
cleanup();
TcpClient::onError(err);
}

void HttpConnection::cleanup()
{
reset();
Expand Down
Loading

0 comments on commit bde7db7

Please sign in to comment.