Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream optimisations #2123

Merged
merged 10 commits into from
Oct 24, 2020
2 changes: 1 addition & 1 deletion Sming/Core/Data/MailMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ MailMessage& MailMessage::addAttachment(IDataSourceStream* stream, MimeType mime

MailMessage& MailMessage::addAttachment(IDataSourceStream* stream, const String& mime, const String& filename)
{
HttpPartResult attachment;
MultipartStream::BodyPart attachment;
attachment.stream = stream;
attachment.headers = new HttpHeaders();
(*attachment.headers)[HTTP_HEADER_CONTENT_TYPE] = mime;
Expand Down
2 changes: 1 addition & 1 deletion Sming/Core/Data/MailMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class MailMessage
private:
IDataSourceStream* stream = nullptr;
HttpHeaders headers;
Vector<HttpPartResult> attachments;
Vector<MultipartStream::BodyPart> attachments;
};

/** @} */
32 changes: 24 additions & 8 deletions Sming/Core/Data/Stream/LimitedMemoryStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,40 @@ uint16_t LimitedMemoryStream::readMemoryBlock(char* data, int bufSize)
return written;
}

bool LimitedMemoryStream::seek(int len)
int LimitedMemoryStream::seekFrom(int offset, unsigned origin)
{
if(readPos + len > capacity) {
return false;
size_t newPos;
switch(origin) {
case SEEK_SET:
newPos = offset;
break;
case SEEK_CUR:
newPos = readPos + offset;
break;
case SEEK_END:
newPos = writePos + offset;
break;
default:
return -1;
}

readPos += len;
if(newPos > writePos) {
return -1;
}

return true;
readPos = newPos;
return readPos;
}

size_t LimitedMemoryStream::write(const uint8_t* data, size_t size)
{
if(writePos + size <= capacity) {
memcpy(buffer + writePos, data, size);
writePos += size;
auto len = std::min(capacity - writePos, size);
if(len != 0) {
memcpy(buffer + writePos, data, len);
writePos += len;
}

// Any data which couldn't be written is just discarded
return size;
}

Expand Down
12 changes: 10 additions & 2 deletions Sming/Core/Data/Stream/LimitedMemoryStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

/**
* @brief Memory stream operating on fixed-size buffer
* Once the limit is reached the stream will discard incoming bytes on write
* Once the limit is reached the stream will discard incoming bytes on write
*
* @ingroup stream
*/
Expand Down Expand Up @@ -52,14 +52,22 @@ class LimitedMemoryStream : public ReadWriteStream
return eSST_Memory;
}

/** @brief Get a pointer to the current position
* @retval "const char*" Pointer to current cursor position within the data stream
*/
char* getStreamPointer() const
{
return reinterpret_cast<char*>(buffer ? buffer + readPos : nullptr);
}

int available() override
{
return writePos - readPos;
}

uint16_t readMemoryBlock(char* data, int bufSize) override;

bool seek(int len) override;
int seekFrom(int offset, unsigned origin) override;

size_t write(const uint8_t* buffer, size_t size) override;

Expand Down
27 changes: 4 additions & 23 deletions Sming/Core/Data/Stream/MultiStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,18 @@

#include "MultiStream.h"

//Use base class documentation
uint16_t MultiStream::readMemoryBlock(char* data, int bufSize)
{
if(stream != nullptr && stream->isFinished()) {
delete stream;
stream = nullptr;
}

if(stream == nullptr && nextStream != nullptr) {
stream = nextStream;
nextStream = nullptr;
}

if(stream == nullptr) {
nextStream = getNextStream();
if(!nextStream) {
stream = getNextStream();
if(stream == nullptr) {
finished = true;
if(!onCompleted()) {
return 0;
}
} else {
onNextStream();
return 0;
}
}

Expand All @@ -42,14 +32,5 @@ uint16_t MultiStream::readMemoryBlock(char* data, int bufSize)

bool MultiStream::seek(int len)
{
if(stream == nullptr) {
return false;
}

return stream->seek(len);
}

bool MultiStream::isFinished()
{
return (finished && (stream == nullptr || stream->isFinished()));
return stream ? stream->seek(len) : false;
}
37 changes: 14 additions & 23 deletions Sming/Core/Data/Stream/MultiStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,55 +15,46 @@

#include "DataSourceStream.h"

/**
* @brief Base class for read-only stream which generates output from multiple source streams
* @ingroup stream data
*/
class MultiStream : public IDataSourceStream
{
public:
~MultiStream()
{
delete stream;
delete nextStream;
}

StreamType getStreamType() const override
{
return eSST_Unknown;
}

/**
* @brief Return the total length of the stream
* @retval int -1 is returned when the size cannot be determined
*/
int available() override
{
return -1;
}

//Use base class documentation
uint16_t readMemoryBlock(char* data, int bufSize) override;

//Use base class documentation
bool seek(int len) override;

//Use base class documentation
bool isFinished() override;

protected:
virtual IDataSourceStream* getNextStream() = 0;

virtual bool onCompleted()
{
return false;
}

virtual void onNextStream()
bool isFinished() override
{
stream = nextStream;
nextStream = nullptr;
return finished;
}

protected:
IDataSourceStream* stream = nullptr;
IDataSourceStream* nextStream = nullptr;
/**
* @brief Inherited class must implement this
* @retval IDataSourceStream* Next stream to be read out
* Return nullptr if there are no more streams.
*/
virtual IDataSourceStream* getNextStream() = 0;

private:
IDataSourceStream* stream = nullptr;
bool finished = false;
};
63 changes: 27 additions & 36 deletions Sming/Core/Data/Stream/MultipartStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,58 +13,49 @@
#include "MultipartStream.h"
#include "MemoryDataStream.h"

bool MultipartStream::onCompleted()
IDataSourceStream* MultipartStream::getNextStream()
{
if(this->stream != nullptr && !this->stream->isFinished()) {
debug_e("Overwriting unfinished stream!");
// Return content, if available
if(bodyPart.stream != nullptr) {
auto stream = bodyPart.stream;
bodyPart.stream = nullptr;
return stream;
}
delete this->stream;
this->stream = nullptr;

auto stream = new MemoryDataStream();
stream->ensureCapacity(4 + 16 + 4);
stream->print(_F("\r\n--"));
stream->print(getBoundary());
stream->print(_F("--\r\n"));

this->stream = stream;

return true;
}

void MultipartStream::onNextStream()
{
if(this->stream != nullptr && !this->stream->isFinished()) {
debug_e("Overwriting unfinished stream!");
}
delete this->stream;
this->stream = nullptr;
// Fetch next part to send
bodyPart = producer();

// Generate header fragment
auto stream = new MemoryDataStream();
stream->ensureCapacity(4 + 16 + 4);
stream->print(_F("\r\n--"));
stream->print("\r\n");
stream->print("--");
stream->print(getBoundary());
if(bodyPart.headers == nullptr) {
// No more parts
stream->print("--");
}
stream->print("\r\n");

if(result.headers != nullptr) {
if(!result.headers->contains(HTTP_HEADER_CONTENT_LENGTH)) {
if(result.stream != nullptr && result.stream->available() >= 0) {
(*result.headers)[HTTP_HEADER_CONTENT_LENGTH] = result.stream->available();
if(bodyPart.headers != nullptr) {
if(bodyPart.stream != nullptr && !bodyPart.headers->contains(HTTP_HEADER_CONTENT_LENGTH)) {
auto avail = bodyPart.stream->available();
if(avail >= 0) {
(*bodyPart.headers)[HTTP_HEADER_CONTENT_LENGTH] = avail;
}
}

for(unsigned i = 0; i < result.headers->count(); i++) {
stream->print((*result.headers)[i]);
for(unsigned i = 0; i < bodyPart.headers->count(); i++) {
stream->print((*bodyPart.headers)[i]);
}

delete result.headers;
result.headers = nullptr;
}
stream->print("\r\n");
delete bodyPart.headers;
bodyPart.headers = nullptr;

this->stream = stream;
stream->print("\r\n");
}

nextStream = result.stream;
return stream;
}

const char* MultipartStream::getBoundary()
Expand Down
60 changes: 35 additions & 25 deletions Sming/Core/Data/Stream/MultipartStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,36 @@
#include "Network/Http/HttpHeaders.h"

/**
* @brief Multipart stream class
* @ingroup stream data
*
* @{
* @brief Read-only stream for creating HTTP multi-part content
* @see See https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html
* @ingroup stream data
*/

typedef struct {
HttpHeaders* headers = nullptr;
IDataSourceStream* stream = nullptr;
} HttpPartResult;

typedef Delegate<HttpPartResult()> HttpPartProducerDelegate;

class MultipartStream : public MultiStream
{
public:
MultipartStream(HttpPartProducerDelegate delegate) : producer(delegate)
/**
* @brief Each result item contains a set of headers plus content stream
*/
struct BodyPart {
HttpHeaders* headers = nullptr;
IDataSourceStream* stream = nullptr;
};

/**
* @brief Callback used to produce each result
*/
using Producer = Delegate<BodyPart()>;

MultipartStream(Producer delegate) : producer(delegate)
{
}

~MultipartStream()
{
delete bodyPart.headers;
delete bodyPart.stream;
}

/**
* @brief Returns the generated boundary
*
Expand All @@ -44,20 +54,20 @@ class MultipartStream : public MultiStream
const char* getBoundary();

protected:
IDataSourceStream* getNextStream() override
{
result = producer();
return result.stream;
}

void onNextStream() override;
bool onCompleted() override;
IDataSourceStream* getNextStream() override;

private:
HttpPartProducerDelegate producer;
HttpPartResult result;

Producer producer;
BodyPart bodyPart;
char boundary[16] = {0};
};

/** @} */
/**
* @deprecated Use `MultipartStream::BodyPart` instead
*/
typedef MultipartStream::BodyPart HttpPartResult SMING_DEPRECATED;

/**
* @deprecated Use `MultipartStream::Producer` instead
*/
typedef MultipartStream::Producer HttpPartProducerDelegate SMING_DEPRECATED;
Loading