From bf314e41c602b1e2e26c21e5ddab91a06e5361d5 Mon Sep 17 00:00:00 2001 From: Richard van Heest Date: Fri, 30 Sep 2016 09:31:07 +0200 Subject: [PATCH 1/6] setup of final chapter --- report/6.0-conclusion-and-future-work.tex | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/report/6.0-conclusion-and-future-work.tex b/report/6.0-conclusion-and-future-work.tex index 06b0f0e..e6a2957 100644 --- a/report/6.0-conclusion-and-future-work.tex +++ b/report/6.0-conclusion-and-future-work.tex @@ -1,20 +1,28 @@ \chapter{Conclusion and future work} +\todo{introduction, looking back, personal notes (???)} +\section{Conclusion} \todo{content here} + +\section{Future work} +\todo{discuss the potentials of future work as listed below} \begin{itemize} \item Backpressure \begin{itemize} - \item Push solution – control number of workers with certain metrics. This will presumably also work with hot streams such as UI events and time + \item Push solution - control number of workers with certain metrics. This will presumably also work with hot streams such as UI events and time \begin{itemize} \item Queue length \item Net queue length change \item In/out ratio per time unit \end{itemize} + \item Using RxJava2.x, you can apparently define a `backpressure policy' on the new \code{Flowable} type + \end{itemize} \item Feedback control \begin{itemize} \item It’s basically an algorithm to solve ‘control problems’. How is the problem class defined where feedback control can be used as an easy solution? \item It’s difficult to tune PID controllers. (refer to blog) Would it be possible to use ML for this process? - \item The PID controller originally comes from the field of physics and mechanical/electrical engineering and is considered to be the default controller, given its (mathematical) simplicity and quick response to change. Does this still hold for computer science, since we don’t need/use mathematical models? Or is there an alternative, easier to tune, controller that could be considered as default in computer science? + \item The PID controller originally comes from the field of physics and mechanical/electrical engineering and is considered to be the default controller, given its (mathematical) simplicity and quick response to change. Does this still hold for computer science, since we don’t need/use mathematical models? Or is there an alternative, easier to tune, controller that could be considered as default in computer science? (also take into account that the PID controller makes the assumption of a \textit{continuous} world, whereas computer science presumes a \textit{discrete} world) + \item We derived a feedback API based on a transformation of $\obs \Rightarrow \obs$. Would it be possible to also do make a similar API by just adding the feedback operator to the \obs API? This would allow for some form of recursion within the stream, allowing for a whole new class of problems to be solved using reactive programming. \end{itemize} \end{itemize} \ No newline at end of file From 2109b4164c0ca50dc16404848d259444cac226f2 Mon Sep 17 00:00:00 2001 From: Richard van Heest Date: Fri, 7 Oct 2016 10:39:21 +0200 Subject: [PATCH 2/6] add interval feedback system to proposals for future work --- report/6.0-conclusion-and-future-work.tex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/report/6.0-conclusion-and-future-work.tex b/report/6.0-conclusion-and-future-work.tex index e6a2957..554be41 100644 --- a/report/6.0-conclusion-and-future-work.tex +++ b/report/6.0-conclusion-and-future-work.tex @@ -16,7 +16,7 @@ \section{Future work} \item In/out ratio per time unit \end{itemize} \item Using RxJava2.x, you can apparently define a `backpressure policy' on the new \code{Flowable} type - + \item \code{interval} in our feedback system; control it using feedback rather than having it as a constant in \code{from} \end{itemize} \item Feedback control \begin{itemize} @@ -25,4 +25,4 @@ \section{Future work} \item The PID controller originally comes from the field of physics and mechanical/electrical engineering and is considered to be the default controller, given its (mathematical) simplicity and quick response to change. Does this still hold for computer science, since we don’t need/use mathematical models? Or is there an alternative, easier to tune, controller that could be considered as default in computer science? (also take into account that the PID controller makes the assumption of a \textit{continuous} world, whereas computer science presumes a \textit{discrete} world) \item We derived a feedback API based on a transformation of $\obs \Rightarrow \obs$. Would it be possible to also do make a similar API by just adding the feedback operator to the \obs API? This would allow for some form of recursion within the stream, allowing for a whole new class of problems to be solved using reactive programming. \end{itemize} -\end{itemize} \ No newline at end of file +\end{itemize} From 6c6ce6d8c9ee9b3ab570bafbb41e566aa49891ef Mon Sep 17 00:00:00 2001 From: Richard van Heest Date: Fri, 21 Oct 2016 16:13:43 +0200 Subject: [PATCH 3/6] reorganizing chapter 5 --- report/0.0-acknowledgment.tex | 4 +- report/1.2-intro-to-rx.tex | 1 + ...-overproduction-using-feedback-control.tex | 18 +- report/5.1-buffer-control.tex | 162 ------------------ report/5.1-overview.tex | 21 +++ .../5.2-universal-interactive-interface.tex | 25 +++ report/5.3-feedback-system.tex | 59 +++++++ report/5.4-controller.tex | 37 ++++ report/5.5-backpressure-alternative.tex | 18 ++ 9 files changed, 174 insertions(+), 171 deletions(-) delete mode 100644 report/5.1-buffer-control.tex create mode 100644 report/5.1-overview.tex create mode 100644 report/5.2-universal-interactive-interface.tex create mode 100644 report/5.3-feedback-system.tex create mode 100644 report/5.4-controller.tex create mode 100644 report/5.5-backpressure-alternative.tex diff --git a/report/0.0-acknowledgment.tex b/report/0.0-acknowledgment.tex index f855657..99d89cd 100644 --- a/report/0.0-acknowledgment.tex +++ b/report/0.0-acknowledgment.tex @@ -2,9 +2,9 @@ \chapter*{Acknowledgment} I would like to thank Erik Meijer, my supervisor during this thesis project, for all his support, enthusiasm and hacker mentality. \textit{Erik, it was an honor to receive an email in May 2014, asking whether I was interested in doing my master thesis with you. From the beginning it was clear to both of us that Reactive Programming and Rx were a shared interest. It was great working with you on RxMobile, which has turned out to be very useful for this thesis. You really inspired me to have a hacker mentality, always keep trying to get things working and to never give up. I sincerely hope the completion of this thesis will not be the \code{onCompleted} event on the stream of our collaborations, but instead will just be the \code{onNext} event of a finished project on a much longer stream!} -I also want to thank Georgios Gousios and Arie van Deursen for taking their time to be part of my thesis committee. +I also want to thank Georgios Gousios and Arie van Deursen for taking their time to be part of my thesis committee and providing me with great feedback. -Furthermore, I would like to thank my fellow students and friends at TU Delft, Eddy Bertoluzzo, Georgi Khomeriki, Mircea Voda, Mike de Waard and Lars Willems, who were doing their master theses in parallel with me. Thanks for the great time we had together at EWI-HB08.250 and the many discussions at the whiteboard and at Hangouts/Skype! +Furthermore, I would like to thank my fellow students and friends at TU Delft, Eddy Bertoluzzo, Georgi Khomeriki, Mircea Voda, Mike de Waard and Lars Willems, who were doing their master theses in parallel with me. Thanks for the great time we had together at EWI-HB 08.250 and the many discussions at the whiteboard and at Hangouts/Skype! A special thanks to Michel van Heest, my brother, for creating all the diagrams in this thesis. diff --git a/report/1.2-intro-to-rx.tex b/report/1.2-intro-to-rx.tex index 0e507a3..178fd17 100644 --- a/report/1.2-intro-to-rx.tex +++ b/report/1.2-intro-to-rx.tex @@ -1,4 +1,5 @@ \section{Reactive Extensions} +\label{sec:pure-rx-interfaces} There have been many attempts to fit the philosophy of reactive programming into frameworks, APIs and even languages \cite{ReactiveX, meijer2015-Dart, Reactive-Streams, Akka, Elm, RxMobile}. In this section, we will discuss some of the features of one of these libraries, namely Reactive Extensions (a.k.a. Rx). This project started at Microsoft with an implementation in C\# \cite{meijer2010-Observable} (Rx.Net), was later ported to Java, Scala, Groovy, Kotlin, JavaScript, Swift and many other languages by the open source community \cite{ReactiveX}. It is currently the standard library for programming in a reactive way. Unfortunately, these various translations have each been evolving in their own way, deviating from both the original implementation as well as each other. There are obvious minor changes such as operator names changing to conform particular language standards, but also behavior in various corner cases changed. Most remarkable however is that some implementations are not even purely `reactive' anymore \cite{meijer2014-Derivation}. Given these deviations from the original paradigm and the state of complexity of these implementations, we decided to use a reference implementation of the original Rx that has recently been written in Scala by Erik Meijer et al. called RxMobile \cite{RxMobile}, with the purpose of creating a light-weight implementation for mobile app development. The following discussion and derivation of the API will however apply to both Reactive Extensions and RxMobile and in this section we will therefore refer to both of them as `Rx'. diff --git a/report/5.0-solving-overproduction-using-feedback-control.tex b/report/5.0-solving-overproduction-using-feedback-control.tex index 9b9fef0..93e4020 100644 --- a/report/5.0-solving-overproduction-using-feedback-control.tex +++ b/report/5.0-solving-overproduction-using-feedback-control.tex @@ -5,14 +5,18 @@ \chapter{Solving overproduction with feedback control} We also discussed several solutions to overproduction in the light of these three groups of sources. We learned that \textit{avoiding} by grouping or dropping data works perfectly for hot and cold asynchronous sources as a first line of defense. \textit{Callstack blocking} on the other hand is something that is automatically done to cold synchronous sources but can potentially be dangerous to hot and cold asynchronous sources as they might form a buffer of calls on the stack. The \textit{Reactive Streams} solution and RxJava's \textit{reactive pull} are to be used on cold sources alone, and cannot work with a hot source as they go against the contract of reactiveness as defined in \cite{berry1991-Reactive}. -The central problem here is that we want a single reactive interface to share between all kinds of data streams. Although one might argue that you ought not to be using a reactive interface for an interactive (cold) source, we acknowledge the fact that in many circumstances it is more practical to view and treat them as `streaming' and `real-time' data rather than having them as interactive sources. In order to do so, we need a way to interact with cold sources in an overproduction-safe way. Reactive Streams and reactive pull achieve this by introducing the concept of backpressure and changing the reactive interface itself, making the consumer in charge, rather than the producer. Not only is this against the concept of reactiveness, it also gives many problems with implementing the operators defined on the reactive interface. +The central problem here is that we want a single reactive interface to share between all kinds of data streams\footnote{Although one might argue that you ought not to be using a reactive interface for an interactive (cold) source, we acknowledge the fact that in many circumstances it is more practical to view and treat them as `streaming' and `real-time' data rather than having them as interactive sources.}. Note that this already works for hot sources; by definition they are suitable for a reactive interface. We only need a way to interact with cold sources in an overproduction-safe way. -In this chapter we will propose an alternative to backpressure that makes use of the feedback systems described in \Cref{chap:intro-to-feedback-control,chap:feedback-api}. We already concluded that backpressure is not suitable for hot sources, so we will discard these from the discussion in this chapter. The solution proposed here will apply to interactive sources alone. +Reactive Streams and RxJava's reactive pull achieve this by introducing the concept of backpressure and changing the reactive interface itself, making the consumer in charge, rather than the producer. Not only is this against the concept of reactiveness, it also gives many problems with implementing the operators defined on the reactive interface. -\input{5.1-buffer-control} +In this chapter we will propose an alternative to dealing with cold sources, that makes use of the feedback systems described in \Cref{chap:intro-to-feedback-control,chap:feedback-api}. We will show how the overproduction problem can be reduced to a control problem that can be solved using a feedback control system. Furthermore we provide a design and implementation for this feedback system and show how this can be fitted in the purely reactive interface that was described in \Cref{sec:pure-rx-interfaces}. -\todo{not sure what to do for the next section in this chapter; want to discuss this with \href{https://github.com/gousiosg}{@gousiosg}} -\section*{Next section(s)???} -Most likely I want to show some examples of where to apply this feedback based overflow control. \href{https://github.com/GeorgiKhomeriki}{@georgikhomeriki} has a nice \href{https://gist.github.com/GeorgiKhomeriki/97cc281a086f5a12c96ad36fe95d0393}{gist} put together with the effects of backpressure on the RxJava Observable. I already experimented with this, translated it to Scala and added the solution from the previous section, as shown \href{https://github.com/Applied-Duality/Feedback-Control/blob/master/BackpressureControl/RxScala/src/main/scala/fbc/backpressure/BackpressureDemo.scala}{here}. However, I'm not sure if and how to present this. +\input{5.1-overview} -And what else should I do here? Not sure yet. I'm thinking of some form of scientific justification (after all, this is a university level master thesis), but I haven't figured out yet what to do. +\input{5.2-universal-interactive-interface} + +\input{5.3-feedback-system} + +\input{5.4-controller} + +\input{5.5-backpressure-alternative} diff --git a/report/5.1-buffer-control.tex b/report/5.1-buffer-control.tex deleted file mode 100644 index 11e6511..0000000 --- a/report/5.1-buffer-control.tex +++ /dev/null @@ -1,162 +0,0 @@ -\section{Controlling a buffer} -\label{sec:buffer-control} -RxJava points out on its wiki page \cite{RxJava-Wiki-Backpressure} about backpressure that it does not make the problem of overproduction in the source go away. It claims to only move ``\textit{the problem up the chain of operators to a point where it can be handled better}''. To do so, they created the \textit{reactive pull} mechanism with operators like \code{onBackpressureBuffer} and \code{onBackpressureDrop}, such that the flow control is moved up to these kinds of operators. - -We propose to move this flow control even further up the chain, up to the point where the source of the stream is drained in the pipeline of operators. Only there we can have maximum control over how much data is brought into the stream at a particular point in time. With this we do not have the need for infinite buffers as is the case in \code{onBackpressureBuffer}, nor do we have to drop unprocessed elements as is done with \code{onBackpressureDrop}. We propose to not wrap the cold source in the \code{Observable.create} (or any of its derived factory methods) but to wrap it in a universal, interactive interface. This way we are not dependent in our implementation on what kind of source we are dealing with. Given this interactive interface we can fill a \textit{bounded} buffer with as many elements as can be processed at a particular point in time. The buffer pulls data from the source on behalf of the subscriber, which gets as much data pushed at it as it is able to handle. Pushing an element from the buffer to the downstream will automatically block the thread for another element to be pushed until the first one is fully processed. - -To control the buffer's size, we will use feedback control. This makes total sense, as we don't know how fast the downstream is going to drain the buffer. However, it does not make any sense to give a certain size to the setpoint and compare the current size with it, as some `slow' consumers might go faster or slower than expected. Bounding the buffer to a certain fixed size defeats the purpose of the feedback system in this case, as we cannot dynamically grow or shrink the size as needed. On the other hand, it is also not possible to ask ``\textit{make sure the buffer is filled to its optimal size}''. A feedback system is not able to solve this, as it does not have a particular setpoint specified. - -Instead of controlling the buffer size directly, we choose to measure the ratio between what goes out the buffer and what comes in the buffer. We will refer to this ratio as the system's throughput. In an optimal situation the amount of data that comes in is just as much as comes out of the system, so ideally this ratio must be $1.0$, which will be the setpoint of this system. Given the error that comes from the difference between the setpoint and the actual throughput, we can then determine how many elements to request from the source in the next iteration. The controller that does this will be discussed in a later section. - -The full feedback system is depicted in \Cref{fig:backpressure-feedback-system}. Here it is also clearly visible that the source itself is \emph{not} part of the feedback system, but is \emph{used} by the system to retrieve a certain number of elements from. Also note that the \textit{downstream handler} is not part of feedback system. Even though it \emph{interacts} with the buffer, it is an external force that influences the behavior of the system. Ultimately the \textit{downstream handler} is the part that exposes an \obs for an \obv to listen to. - -\begin{figure}[H] - \begin{center} - \includegraphics[width=0.8\textwidth]{figures/Backpressure-feedback-system.png} - \end{center} - \caption{Feedback system for controlling overproduction} - \label{fig:backpressure-feedback-system} -\end{figure} - -In the rest of this section we will go into the details of this feedback system and discuss both the theoretical background and implementations. - -\subsection{A universal, interactive interface} -As mentioned above, we propose to not wrap the (cold) source directly in \code{Observable.create}, but instead wrap it in a universal, interactive interface. This is necessary since there are many variants of interactive interfaces that all do the same, but each one in a slightly different way. - -For example, the \itr interface has an \code{hasNext} and \code{next} method, which respectively check if there is a next element and return the next element. C\#'s \ier on the contrary has methods such as \code{moveNext}, which fetches the next element and returns whether there is a next element, and \code{current}, which actually returns the next element. For SQL database interaction, Java defines a \code{ResultSet}. This interface has a method called \code{next}, which moves the cursor to the next row of the result, and methods such as \code{getInt(int columnIndex)} and \code{getString(int columnIndex)} to get the content of a specific type from a column in the row the cursor is pointing to. - -One thing these interfaces have in common is that they contain a method that fetches a single element and in the mean time block the thread it is operating on. If this fetch takes some time, your program will have to wait for the result to come in. To prevent this blocking behavior, we propose a universal interactive interface in which you request an element and subscribe to a stream on which \textit{eventually} this element will be emitted. Note that we separate the concerns of \textit{requesting} a next element and \textit{receiving} a next element. In this way, the program can still continue to operate and maybe do some other things while it is waiting for the requested element. - -Given that we will use this interface in a feedback system that controls a buffer, we will pose an extra requirement on this interface. As the feedback system's controller might conclude that $n > 1$ elements need to be requested from the source, we must have to possibility to do so. Rather than $n$ times requesting 1 element, we want to request $n$ elements at once. - -The complete interface is called \code{Requestable[T]} and is shown in \Cref{lst:universal-interactive-interface}. It contains a single abstract method \code{request(n: Int): Unit}, which is called whenever the user of this interface wants a certain number of elements from the source. The requested elements will at some point in time be emitted by the \obs that is returned by \code{results: Observable[T]}. If no more elements are available in the source, this \obs will terminate with an \code{onCompleted} event. The implementor of \code{Requestable} is expected to use the \code{subject} to bring elements in the stream, whereas the user of the interface is expected to observe \code{results} in order to get the requested data. Note that this is a \emph{hot} stream: element emission will not be repeated as a second \obv subscribes to the stream. - -Example implementations of this interface for \itr and \code{ResultSet} are included in \Cref{app:backpressure-solution}. - -\begin{minipage}{\linewidth} -\begin{lstlisting}[style=ScalaStyle, caption={Universal, interactive interface used in the feedback system}, label={lst:universal-interactive-interface}] -trait Requestable[T] { - - protected final val subject $=$ PublishSubject[T]() - - final def results: Observable[T] $=$ subject - - def request(n: Int): Unit -} -\end{lstlisting} -\end{minipage} - -\subsection{The feedback system} -Now that we are able to interact with any cold source via the \code{Requestable} interface, we can continue designing and discussing the actual feedback system that controls the size of the bounded buffer. As stated before, we do not control the \emph{actual} size of the buffer by using a setpoint of any arbitrary, fixed number of elements. Instead we observe how many elements were taken out of the buffer in relation to how many elements were in the buffer during a particular time span. With that we do in fact not control the buffer's \emph{size}, but rather control the \emph{throughput} of the buffer, while making changes to the number of elements that are requested from the source at every feedback cycle. - -The throughput in a particular time span ($\tau_t$) is defined in terms of how many elements are there to be consumed in relation to how many of these elements are actually being consumed. In a scenario where the elements that are not consumed in a certain time span are discarded or where the buffer is flushed at the end of each time span, the throughput would be equal to the ratio of how many elements were being consumed to how many elements were presented to be consumed in a certain time span. In our case, however, we do not wish to discard any elements but rather keep the left-over elements from the previous time span and make them part of what is available to be consumed in the next time span. With this we can define the throughput $\tau_t$ at time $t$ as - -\begin{equation}\label{eq:throughput-fraction} -\tau_t = \frac{q_{t-1} + n_t - q_t }{q_{t-1} + n_t} \text{ \textbf{with} }q_{t-1}\text{, }q_t\text{, }n_t\text{ integers} \geq 0 -\end{equation} - -or - -\begin{equation}\label{eq:throughput-simple} -\tau_t = 1 - \frac{q_t}{q_{t-1} + n_t} \text{ \textbf{with} }q_{t-1}\text{, }q_t\text{, }n_t\text{ integers} \geq 0 -\end{equation} - -In these formulas, $q_t$ is the size of the buffer at time $t$, whereas $n_t$ is the number of elements that has been put in the buffer between time $t - 1$ and $t$. - -\Cref{eq:throughput-simple} provides us with a sense of the range of $\tau_t$. Since $q_t \leq q_{t-1} + n_t$ (it is not possible to take out more elements than are present in the buffer) we can guarantee a lower bound for $\tau_t$ of $0.0$. Likewise, since $q_{t-1}, q_t, n_t \geq 0$, we can set an upper bound for $\tau_t$ of $1.0$. Still there is the possibility of dividing by 0, but we will guard against this in the next couple of paragraphs. - -\begin{equation}\label{eq:range-of-tau} -0.0 \leq \tau_t \leq 1.0 -\end{equation} - -With $\tau$ as the metric for the feedback system that controls the buffer, it is not difficult to come up with an appropriate setpoint. We want the throughput to be as high as possible, which is, given \Cref{eq:range-of-tau}, $1.0$. - -The next point in designing this feedback loop is to determine when a new cycle starts. For this we have to observe that it will only make sense for a new cycle to start if the downstream has polled at least one element from the buffer. If in a certain time span the downstream is too busy processing one element, it does not make any sense to do a new measurement of the throughput. As new elements have been coming in based on the previous feedback cycle, but no elements have been taken out of the buffer, we do not need to request more elements. Instead, we just extend the time span by merging it with the next, until at least one element has been taken out of the buffer. Only then the feedback loop will run a new cycle. - -Note that using this definition of a feedback cycle is a guard against dividing by 0 in \Cref{eq:throughput-fraction,eq:throughput-simple}. This can only happen when at the start of a time span the buffer is empty and during this time span no elements are coming into the buffer. This can either be due to an unfortunate decision of the controller (which we will discuss in \Cref{subsec:controller-design}) the request no further elements from the source, even though the buffer is empty, or because it takes some amount of time before the source can produce its next element. If the buffer was empty at the start and no elements were coming in, the downstream would at no point during this time span be able to poll an element from the buffer. Because of this, the current time span is merged with the next time span, without running through a whole new cycle and therefore also without running into dividing by 0 while calculating $\tau$. - -\subsubsection*{Implementation} -With this metrics and its constraints in mind, we can start implementing this system using the feedback API as described in \Cref{chap:feedback-api}. For now we will assume the existence of the controller that will be used in this feedback system, even though it will only be discussed first in \Cref{subsec:controller-design}. We assume a value \code{controller} of type \code{Component[Double, Int]}, with an input as the difference between the setpoint value and the actual throughput, and an output as the number of elements to be requested from the source. We furthermore assume the existence of a value \code{source} of type \code{Requestable[T]} from which we can request these element of a generic type \code{T}. - -The buffer is modeled as a \code{BlockingQueue[T]}, such that multiple threads (to put and poll respectively) can safely interact with it. Besides that we introduce two flags of type \code{AtomicBoolean}, which signal respectively whether the source is completed (it has no more values) and whether there has been a successful poll during the current feedback cycle. - -The code for the feedback system itself is shown in \Cref{lst:buffer-feedback-control}. Given the controller, we first send the number of requested elements to the source, which then starts producing at most this amount of elements. These are received by the feedback system by listening to \code{source.results}. These elements are then put into the queue. - -To measure the throughput of the buffer, we collect the elements during a certain interval. From this we measure how many elements have come in the queue, as well as the total number of elements that are currently in the queue. As a side-effect we reset the flag for \code{pollerVisited} to false, since we are now done interacting with the queue. Also, we provide a default starting value for the feedback system at this point, since initially the queue was empty and no elements were going in the queue. It is necessary to do so, as we next compare the current situation with the previous situation by using a \code{buffer(2, 1)}. Finally, we compute the throughput as described in \Cref{eq:throughput-fraction}. This value is fed as the input of the next feedback cycle without performing any operations on the way back. - -\begin{minipage}{\linewidth} -\begin{lstlisting}[style=ScalaStyle, caption={Feedback system for controlling the buffer}, label={lst:buffer-feedback-control}] -controller - .tee(n $\Rightarrow$ source.request(n)) - .liftRx(_.publish(_ $\Rightarrow$ source.results)) - .tee(x $\Rightarrow$ queue.put(x)) - .liftRx(_.buffer(interval.filter(_ $\Rightarrow$ pollerVisited.get()))) | \label{line:interval-in-feedback} | - .map(in $\Rightarrow$ (in.size, queue.size)) - .tee(_ $\Rightarrow$ pollerVisited.compareAndSet(true, false)) - .startWith((0, 0)) // initially there is no input and the queue is empty - .liftRx(_.buffer(2, 1)) - .filter(_.size $==$ 2) - .map { - case Seq((_, queueBefore), (in, queueAfter)) $\Rightarrow$ - (queueBefore - queueAfter + in).toDouble / (queueBefore + in) - } - .feedback(throughput $\Rightarrow$ throughput) -\end{lstlisting} -\end{minipage} - -The rest of the code, the queue polling behavior, initialization of various values and the wrapping of the whole mechanic in an \code{Observable.apply}, are considered trivial. This can be found in \Cref{app:backpressure-solution}. - -\subsection{A special controller} -\label{subsec:controller-design} -The final piece of this feedback control system is the controller, who's job it is to transform the difference between the setpoint and the throughput into a new number of elements to be requested from the source. However, before introduce the controller used in this control system, we first have to observe a number of issues. - -We first have to consider the range of error values that is possible within this system. Since we have established that $\tau_t$ must be a value between 0.0 and 1.0 (\Cref{eq:range-of-tau}), and since we have set the setpoint to a value of 1.0, we must conclude that the range of values for the error must be between 0.0 and 1.0 as well (following \Cref{eq:tracking-error}). - -Although this bound may seem to be a good thing, it actually has some interesting implications on the controller of our choice. Janert observes in chapter 15 of his book \cite{janert2013-feedback} regarding this kind of bounds that they are not symmetric around the setpoint and that it is not even possible to have a negative error. For a standard PID controller to work well, it should preferably have a range of errors that is symmetric around the setpoint. - -Janert suggests to solve this problem by not fixing the setpoint at 1.0 but put it ever so slightly below 1.0. With setpoints like 0.999 or 0.9995, he argues, we will do just as good, as the outcome of the controller will be an integer value rather than a floating point number. We are only able to add or subtract an entire element from the number to be requested. This however causes an unusual asymmetry in the tracking error. Although it can become negative, the error can become much more positive. Using a setpoint of 0.999, the tracking error on the negative side can be at most $0.999 - 1.0 = -0.001$. On the positive side, however, the tracking error can be at most 0.999, which is more than two orders of magnitude larger! As a control action originating from a PID controller is proportional to error, it becomes clear that control actions that tend to increase the number of requested elements will be more than two orders of magnitude stronger than control actions that tend to decrease the number of requested elements. Janert therefore concludes that this is not at all desirable and moves on to a completely new type of controller. - -The problem that is discussed in chapter 15 of Janert's book is fairly similar to the situation at hand and so we will create a slightly modified version of his controller. We will keep the setpoint at 1.0, as stated in the previous section. Notice that with this the tracking error can never be negative. Also note that whenever $\tau_t = 1.0$, the tracking error will become zero. This can be interpreted as a signal from the downstream that it was completely able to keep up with the number of elements that were available in the buffer. Most likely this means that the number of requested elements was not high enough for the downstream to be kept busy all the time. We will therefore \textit{increment} the number of requested elements by 1 whenever this happens. - -A tracking error greater than zero, on the other hand, signals that the downstream was not able to keep up with the total number of elements that were already present in the buffer and those that were added to the buffer in the previous cycle. This can either mean that the downstream is \emph{incidentally} too busy to pull all elements out or that it can \emph{structurally} not handle the number of elements it is faced with. We will take an optimistic approach here and first assume that this is just an incidental occasion of less elements being consumed. Therefore we change nothing to the number of elements to be requested. We will however keep track of how many times in a row this situation of the tracking error being greater than zero occurs. Only if it happens a certain number of times in a row, we will \textit{decrement} the number of requested elements by 1. From that point on, we will monitor even closer and decrement once again (with briefer periods in between) if the throughput remains less than 1.0. If, however, the throughput comes back to 1.0, we consider it to be a satisfying number of elements, stop decrementing and start increasing the number of requested elements again. - -\subsubsection*{Implementation} -As the attentive reader may already have noticed, this is an incremental controller: it does not state how many elements should be requested next, but rather return by how many the number of elements to be requested should be increased or decreased. The actual number is calculated by an extra component added right after the controller, which does the integration over all historical $\Delta n$'s. - -The controller itself is basically a stateful class with a transformation method to construct the next state. Furthermore we have an initial state defined on this class to get everything started. Using the API defined in the previous chapter, we can wrap this state into a \code{Component} using RxMobile's \code{Observable.scanLeft} operator. After the newest $\Delta n$ is extracted from this state, we use a \code{Component.scanLeft} to compute the actual $n$. In this step we also prevent the requested number of elements to go negative. - -\begin{minipage}{\linewidth} -\begin{lstlisting}[style=ScalaStyle, caption={Controller implementation for controlling the buffer}, label={lst:buffer-controller}] -class Controller(time: Int, val change: Int) { - def handle(error: Double): Controller $=$ - if (error $==$ 0.0) new Controller(period1, 1) // throughput was 1.0 - else if (time $==$ 1) new Controller(period2, -1) - else new Controller(time - 1, 0) -} -object Controller { - def initial $=$ new Controller(period1, 0) -} - -val controller $=$ Component[Double, Controller](_.scanLeft(Controller.initial)(_ handle _)) - .drop(1) - .map(_.change) - .scanLeft(initialRequest)((sum, d) $\Rightarrow$ scala.math.max(0, sum + d)) -\end{lstlisting} -\end{minipage} - -\subsection{An alternative to backpressure} -In \Cref{sec:your-mouse-is-not-a-database} we proposed to take backpressure out of a library like RxJava and make it part of an interface which works on interactive sources specifically. With the development of a feedback system on buffer control as described in this section, we can create such an API, where we wrap an interactive source and let it draw and reactively emit them based on how fast the downstream can handle them. To be more precise, given an interactive source, we can wrap it in a \code{Requestable} as described in \Cref{lst:universal-interactive-interface} and use the feedback system in \Cref{lst:buffer-feedback-control} to request elements from the source and put them in the buffer. Then the downstream mechanism will poll the buffer continuously and emit these elements in a reactive fashion to a sequence of operators followed by a final \obv. With this we can create a function called \code{from} that takes a \code{Requestable[T]} as its argument and returns an \code{Observable[T]} that emits all elements in the source. - -\begin{lstlisting}[style=InlineScalaStyle] -def from[T](source: Requestable[T]): Observable[T] -\end{lstlisting} - -The code for this function mainly consists of a combination of \Cref{lst:universal-interactive-interface,lst:buffer-feedback-control,lst:buffer-controller} and some glue to make it all work together. Refer to \Cref{app:backpressure-solution} for the full implementation. - -One thing to highlight is that \code{from} not only takes the wrapped source as an input parameter, but also requires the interval at which the feedback system has to run. This is the \code{interval} which is used in \Cref{lst:buffer-feedback-control} \cref{line:interval-in-feedback} to determine when to do a next measurement of the throughput in the feedback system. - -\begin{lstlisting}[style=InlineScalaStyle] -def from[T](source: Requestable[T], interval: Duration): Observable[T] -\end{lstlisting} - -As one can imagine, this greatly influences the speed at which the elements are being emitted. If a source can emit elements immediately and the interval is set to 1 second, it will take much longer for all elements to be emitted than when it is set 1 millisecond. Of course the speed will ultimately be determined by how fast the downstream can consume any element. Given the discussion above on when a new feedback cycle is initiated by measuring the throughput, we can conclude that if \code{interval} is set too fast, it will not influence the performance of the system as a whole, as it will skip the intervals at which the downstream did not show any successful interaction with the queue. If, however, the interval is set at a slower rate than it takes for the downstream to drain the buffer, this will for obvious reasons negatively influence the performance of the system. One could propose to let \code{interval} be dynamically controlled using a feedback system. However, for now we decided to define it as a constant value in the system and make this part of future research. diff --git a/report/5.1-overview.tex b/report/5.1-overview.tex new file mode 100644 index 0000000..2fc84f9 --- /dev/null +++ b/report/5.1-overview.tex @@ -0,0 +1,21 @@ +\section{Overview} +\label{sec:buffer-control} +RxJava points out on its wiki page \cite{RxJava-Wiki-Backpressure} about backpressure that it does not make the problem of overproduction in the source go away. It claims to only move ``\textit{the problem up the chain of operators to a point where it can be handled better}''. To do so, they created the \textit{reactive pull} mechanism with operators like \code{onBackpressureBuffer} and \code{onBackpressureDrop}, such that the flow control is moved up to these kinds of operators. + +We propose to move this flow control even further up the chain; up to the point where the source of the stream is drained in the pipeline of operators. Only there we can have maximum control over how much data is brought into the stream at a particular point in time. With this we do not have the need for infinite buffers as is the case in \code{onBackpressureBuffer}, nor do we have to drop unprocessed elements as is done with \code{onBackpressureDrop}. We propose to not wrap the cold source in the \code{Observable.create} (or any of its derived factory methods) but to wrap it in a universal, interactive interface. This way we are not dependent in our implementation on what kind of source we are dealing with. Given this interactive interface we can fill a \textit{bounded} buffer with as many elements as can be processed at a particular point in time. The buffer pulls data from the source on behalf of the subscriber, which gets as much data pushed at it as it is able to handle. Pushing an element from the buffer to the downstream will automatically block the thread for another element to be pushed until the first one is fully processed. + +To control the buffer's size, we will use feedback control. This makes total sense, as we don't know how fast the downstream is going to drain the buffer. However, it does not make any sense to give a certain size to the setpoint and compare the current size with it, as some `slow' consumers might go faster or slower than expected. Bounding the buffer to a certain fixed size defeats the purpose of the feedback system in this case, as we cannot dynamically grow or shrink the size as needed. On the other hand, it is also not possible to ask ``\textit{make sure the buffer is filled to its optimal size}''. A feedback system is not able to solve this, as it does not have a particular setpoint specified. + +Instead of controlling the buffer size directly, we choose to measure the ratio between what goes out the buffer and what comes in the buffer. We will refer to this ratio as the system's throughput. In an optimal situation the amount of data that comes in is just as much as comes out of the system, so ideally this ratio must be $1.0$, which will be the setpoint of this system. Given the error that comes from the difference between the setpoint and the actual throughput, we can then determine how many elements to request from the source in the next iteration. The controller that does this will be discussed in a later section. + +The full feedback system is depicted in \Cref{fig:backpressure-feedback-system}. Here it is also clearly visible that the source itself is \emph{not} part of the feedback system, but is \emph{used} by the system to retrieve a certain number of elements from. Also note that the \textit{downstream handler} is not part of feedback system. Even though it \emph{interacts} with the buffer, it is an external force that influences the behavior of the system. Ultimately the \textit{downstream handler} is the part that exposes an \obs for an \obv to listen to. + +\begin{figure}[H] + \begin{center} + \includegraphics[width=0.8\textwidth]{figures/Backpressure-feedback-system.png} + \end{center} + \caption{Feedback system for controlling overproduction} + \label{fig:backpressure-feedback-system} +\end{figure} + +In the rest of this section we will go into the details of this feedback system and discuss both the theoretical background and implementations. diff --git a/report/5.2-universal-interactive-interface.tex b/report/5.2-universal-interactive-interface.tex new file mode 100644 index 0000000..d9fdf37 --- /dev/null +++ b/report/5.2-universal-interactive-interface.tex @@ -0,0 +1,25 @@ +\section{A universal, interactive interface} +As mentioned above, we propose to not wrap the (cold) source directly in \code{Observable.create}, but instead wrap it in a universal, interactive interface. This is necessary since there are many variants of interactive interfaces that all do the same, but each one in a slightly different way. + +For example, the \itr interface has an \code{hasNext} and \code{next} method, which respectively check if there is a next element and return the next element. C\#'s \ier on the contrary has methods such as \code{moveNext}, which fetches the next element and returns whether there is a next element, and \code{current}, which actually returns the next element. For SQL database interaction, Java defines a \code{ResultSet}. This interface has a method called \code{next}, which moves the cursor to the next row of the result, and methods such as \code{getInt(int columnIndex)} and \code{getString(int columnIndex)} to get the content of a specific type from a column in the row the cursor is pointing to. + +One thing these interfaces have in common is that they contain a method that fetches a single element and in the mean time block the thread it is operating on. If this fetch takes some time, your program will have to wait for the result to come in. To prevent this blocking behavior, we propose a universal interactive interface in which you request an element and subscribe to a stream on which \textit{eventually} this element will be emitted. Note that we separate the concerns of \textit{requesting} a next element and \textit{receiving} a next element. In this way, the program can still continue to operate and maybe do some other things while it is waiting for the requested element. + +Given that we will use this interface in a feedback system that controls a buffer, we will pose an extra requirement on this interface. As the feedback system's controller might conclude that $n > 1$ elements need to be requested from the source, we must have to possibility to do so. Rather than $n$ times requesting 1 element, we want to request $n$ elements at once. + +The complete interface is called \code{Requestable[T]} and is shown in \Cref{lst:universal-interactive-interface}. It contains a single abstract method \code{request(n: Int): Unit}, which is called whenever the user of this interface wants a certain number of elements from the source. The requested elements will at some point in time be emitted by the \obs that is returned by \code{results: Observable[T]}. If no more elements are available in the source, this \obs will terminate with an \code{onCompleted} event. The implementor of \code{Requestable} is expected to use the \code{subject} to bring elements in the stream, whereas the user of the interface is expected to observe \code{results} in order to get the requested data. Note that this is a \emph{hot} stream: element emission will not be repeated as a second \obv subscribes to the stream. + +Example implementations of this interface for \itr and \code{ResultSet} are included in \Cref{app:backpressure-solution}. + +\begin{minipage}{\linewidth} +\begin{lstlisting}[style=ScalaStyle, caption={Universal, interactive interface used in the feedback system}, label={lst:universal-interactive-interface}] +trait Requestable[T] { + + protected final val subject $=$ PublishSubject[T]() + + final def results: Observable[T] $=$ subject + + def request(n: Int): Unit +} +\end{lstlisting} +\end{minipage} diff --git a/report/5.3-feedback-system.tex b/report/5.3-feedback-system.tex new file mode 100644 index 0000000..d4c994e --- /dev/null +++ b/report/5.3-feedback-system.tex @@ -0,0 +1,59 @@ +\section{Controlling overproduction using feedback control} +Now that we are able to interact with any cold source via the \code{Requestable} interface, we can continue designing and discussing the actual feedback system that controls the size of the bounded buffer. As stated before, we do not control the \emph{actual} size of the buffer by using a setpoint of any arbitrary, fixed number of elements. Instead we observe how many elements were taken out of the buffer in relation to how many elements were in the buffer during a particular time span. With that we do in fact not control the buffer's \emph{size}, but rather control the \emph{throughput} of the buffer, while making changes to the number of elements that are requested from the source at every feedback cycle. + +The throughput in a particular time span ($\tau_t$) is defined in terms of how many elements are there to be consumed in relation to how many of these elements are actually being consumed. In a scenario where the elements that are not consumed in a certain time span are discarded or where the buffer is flushed at the end of each time span, the throughput would be equal to the ratio of how many elements were being consumed to how many elements were presented to be consumed in a certain time span. In our case, however, we do not wish to discard any elements but rather keep the left-over elements from the previous time span and make them part of what is available to be consumed in the next time span. With this we can define the throughput $\tau_t$ at time $t$ as + +\begin{equation}\label{eq:throughput-fraction} +\tau_t = \frac{q_{t-1} + n_t - q_t }{q_{t-1} + n_t} \text{ \textbf{with} }q_{t-1}\text{, }q_t\text{, }n_t\text{ integers} \geq 0 +\end{equation} + +or + +\begin{equation}\label{eq:throughput-simple} +\tau_t = 1 - \frac{q_t}{q_{t-1} + n_t} \text{ \textbf{with} }q_{t-1}\text{, }q_t\text{, }n_t\text{ integers} \geq 0 +\end{equation} + +In these formulas, $q_t$ is the size of the buffer at time $t$, whereas $n_t$ is the number of elements that has been put in the buffer between time $t - 1$ and $t$. + +\Cref{eq:throughput-simple} provides us with a sense of the range of $\tau_t$. Since $q_t \leq q_{t-1} + n_t$ (it is not possible to take out more elements than are present in the buffer) we can guarantee a lower bound for $\tau_t$ of $0.0$. Likewise, since $q_{t-1}, q_t, n_t \geq 0$, we can set an upper bound for $\tau_t$ of $1.0$. Still there is the possibility of dividing by 0, but we will guard against this in the next couple of paragraphs. + +\begin{equation}\label{eq:range-of-tau} +0.0 \leq \tau_t \leq 1.0 +\end{equation} + +With $\tau$ as the metric for the feedback system that controls the buffer, it is not difficult to come up with an appropriate setpoint. We want the throughput to be as high as possible, which is, given \Cref{eq:range-of-tau}, $1.0$. + +The next point in designing this feedback loop is to determine when a new cycle starts. For this we have to observe that it will only make sense for a new cycle to start if the downstream has polled at least one element from the buffer. If in a certain time span the downstream is too busy processing one element, it does not make any sense to do a new measurement of the throughput. As new elements have been coming in based on the previous feedback cycle, but no elements have been taken out of the buffer, we do not need to request more elements. Instead, we just extend the time span by merging it with the next, until at least one element has been taken out of the buffer. Only then the feedback loop will run a new cycle. + +Note that using this definition of a feedback cycle is a guard against dividing by 0 in \Cref{eq:throughput-fraction,eq:throughput-simple}. This can only happen when at the start of a time span the buffer is empty and during this time span no elements are coming into the buffer. This can either be due to an unfortunate decision of the controller (which we will discuss in \Cref{subsec:controller-design}) the request no further elements from the source, even though the buffer is empty, or because it takes some amount of time before the source can produce its next element. If the buffer was empty at the start and no elements were coming in, the downstream would at no point during this time span be able to poll an element from the buffer. Because of this, the current time span is merged with the next time span, without running through a whole new cycle and therefore also without running into dividing by 0 while calculating $\tau$. + +\subsection*{Implementation} +With this metrics and its constraints in mind, we can start implementing this system using the feedback API as described in \Cref{chap:feedback-api}. For now we will assume the existence of the controller that will be used in this feedback system, even though it will only be discussed first in \Cref{subsec:controller-design}. We assume a value \code{controller} of type \code{Component[Double, Int]}, with an input as the difference between the setpoint value and the actual throughput, and an output as the number of elements to be requested from the source. We furthermore assume the existence of a value \code{source} of type \code{Requestable[T]} from which we can request these element of a generic type \code{T}. + +The buffer is modeled as a \code{BlockingQueue[T]}, such that multiple threads (to put and poll respectively) can safely interact with it. Besides that we introduce two flags of type \code{AtomicBoolean}, which signal respectively whether the source is completed (it has no more values) and whether there has been a successful poll during the current feedback cycle. + +The code for the feedback system itself is shown in \Cref{lst:buffer-feedback-control}. Given the controller, we first send the number of requested elements to the source, which then starts producing at most this amount of elements. These are received by the feedback system by listening to \code{source.results}. These elements are then put into the queue. + +To measure the throughput of the buffer, we collect the elements during a certain interval. From this we measure how many elements have come in the queue, as well as the total number of elements that are currently in the queue. As a side-effect we reset the flag for \code{pollerVisited} to false, since we are now done interacting with the queue. Also, we provide a default starting value for the feedback system at this point, since initially the queue was empty and no elements were going in the queue. It is necessary to do so, as we next compare the current situation with the previous situation by using a \code{buffer(2, 1)}. Finally, we compute the throughput as described in \Cref{eq:throughput-fraction}. This value is fed as the input of the next feedback cycle without performing any operations on the way back. + +\begin{minipage}{\linewidth} +\begin{lstlisting}[style=ScalaStyle, caption={Feedback system for controlling the buffer}, label={lst:buffer-feedback-control}] +controller + .tee(n $\Rightarrow$ source.request(n)) + .liftRx(_.publish(_ $\Rightarrow$ source.results)) + .tee(x $\Rightarrow$ queue.put(x)) + .liftRx(_.buffer(interval.filter(_ $\Rightarrow$ pollerVisited.get()))) | \label{line:interval-in-feedback} | + .map(in $\Rightarrow$ (in.size, queue.size)) + .tee(_ $\Rightarrow$ pollerVisited.compareAndSet(true, false)) + .startWith((0, 0)) // initially there is no input and the queue is empty + .liftRx(_.buffer(2, 1)) + .filter(_.size $==$ 2) + .map { + case Seq((_, queueBefore), (in, queueAfter)) $\Rightarrow$ + (queueBefore - queueAfter + in).toDouble / (queueBefore + in) + } + .feedback(throughput $\Rightarrow$ throughput) +\end{lstlisting} +\end{minipage} + +The rest of the code, the queue polling behavior, initialization of various values and the wrapping of the whole mechanic in an \code{Observable.apply}, are considered trivial. This can be found in \Cref{app:backpressure-solution}. diff --git a/report/5.4-controller.tex b/report/5.4-controller.tex new file mode 100644 index 0000000..dfcdacc --- /dev/null +++ b/report/5.4-controller.tex @@ -0,0 +1,37 @@ +\section{A special controller} +\label{subsec:controller-design} +The final piece of this feedback control system is the controller, who's job it is to transform the difference between the setpoint and the throughput into a new number of elements to be requested from the source. However, before introduce the controller used in this control system, we first have to observe a number of issues. + +We first have to consider the range of error values that is possible within this system. Since we have established that $\tau_t$ must be a value between 0.0 and 1.0 (\Cref{eq:range-of-tau}), and since we have set the setpoint to a value of 1.0, we must conclude that the range of values for the error must be between 0.0 and 1.0 as well (following \Cref{eq:tracking-error}). + +Although this bound may seem to be a good thing, it actually has some interesting implications on the controller of our choice. Janert observes in chapter 15 of his book \cite{janert2013-feedback} regarding this kind of bounds that they are not symmetric around the setpoint and that it is not even possible to have a negative error. For a standard PID controller to work well, it should preferably have a range of errors that is symmetric around the setpoint. + +Janert suggests to solve this problem by not fixing the setpoint at 1.0 but put it ever so slightly below 1.0. With setpoints like 0.999 or 0.9995, he argues, we will do just as good, as the outcome of the controller will be an integer value rather than a floating point number. We are only able to add or subtract an entire element from the number to be requested. This however causes an unusual asymmetry in the tracking error. Although it can become negative, the error can become much more positive. Using a setpoint of 0.999, the tracking error on the negative side can be at most $0.999 - 1.0 = -0.001$. On the positive side, however, the tracking error can be at most 0.999, which is more than two orders of magnitude larger! As a control action originating from a PID controller is proportional to error, it becomes clear that control actions that tend to increase the number of requested elements will be more than two orders of magnitude stronger than control actions that tend to decrease the number of requested elements. Janert therefore concludes that this is not at all desirable and moves on to a completely new type of controller. + +The problem that is discussed in chapter 15 of Janert's book is fairly similar to the situation at hand and so we will create a slightly modified version of his controller. We will keep the setpoint at 1.0, as stated in the previous section. Notice that with this the tracking error can never be negative. Also note that whenever $\tau_t = 1.0$, the tracking error will become zero. This can be interpreted as a signal from the downstream that it was completely able to keep up with the number of elements that were available in the buffer. Most likely this means that the number of requested elements was not high enough for the downstream to be kept busy all the time. We will therefore \textit{increment} the number of requested elements by 1 whenever this happens. + +A tracking error greater than zero, on the other hand, signals that the downstream was not able to keep up with the total number of elements that were already present in the buffer and those that were added to the buffer in the previous cycle. This can either mean that the downstream is \emph{incidentally} too busy to pull all elements out or that it can \emph{structurally} not handle the number of elements it is faced with. We will take an optimistic approach here and first assume that this is just an incidental occasion of less elements being consumed. Therefore we change nothing to the number of elements to be requested. We will however keep track of how many times in a row this situation of the tracking error being greater than zero occurs. Only if it happens a certain number of times in a row, we will \textit{decrement} the number of requested elements by 1. From that point on, we will monitor even closer and decrement once again (with briefer periods in between) if the throughput remains less than 1.0. If, however, the throughput comes back to 1.0, we consider it to be a satisfying number of elements, stop decrementing and start increasing the number of requested elements again. + +\subsection*{Implementation} +As the attentive reader may already have noticed, this is an incremental controller: it does not state how many elements should be requested next, but rather return by how many the number of elements to be requested should be increased or decreased. The actual number is calculated by an extra component added right after the controller, which does the integration over all historical $\Delta n$'s. + +The controller itself is basically a stateful class with a transformation method to construct the next state. Furthermore we have an initial state defined on this class to get everything started. Using the API defined in the previous chapter, we can wrap this state into a \code{Component} using RxMobile's \code{Observable.scanLeft} operator. After the newest $\Delta n$ is extracted from this state, we use a \code{Component.scanLeft} to compute the actual $n$. In this step we also prevent the requested number of elements to go negative. + +\begin{minipage}{\linewidth} +\begin{lstlisting}[style=ScalaStyle, caption={Controller implementation for controlling the buffer}, label={lst:buffer-controller}] +class Controller(time: Int, val change: Int) { + def handle(error: Double): Controller $=$ + if (error $==$ 0.0) new Controller(period1, 1) // throughput was 1.0 + else if (time $==$ 1) new Controller(period2, -1) + else new Controller(time - 1, 0) +} +object Controller { + def initial $=$ new Controller(period1, 0) +} + +val controller $=$ Component[Double, Controller](_.scanLeft(Controller.initial)(_ handle _)) + .drop(1) + .map(_.change) + .scanLeft(initialRequest)((sum, d) $\Rightarrow$ scala.math.max(0, sum + d)) +\end{lstlisting} +\end{minipage} diff --git a/report/5.5-backpressure-alternative.tex b/report/5.5-backpressure-alternative.tex new file mode 100644 index 0000000..98854dc --- /dev/null +++ b/report/5.5-backpressure-alternative.tex @@ -0,0 +1,18 @@ +\section{An API for cold sources} +With the development of a feedback system on buffer control as described in the previous sections, we can create an API that wraps an interactive source and let it draw and reactively emit elements from the interactive source based on how fast the downstream can handle these elements. To be more precise, given an interactive source, we can wrap it in a \code{Requestable} as described in \Cref{lst:universal-interactive-interface} and use the feedback system in \Cref{lst:buffer-feedback-control} to request elements from the source and put them in the buffer. Then the downstream mechanism will poll the buffer continuously and emit these elements in a reactive fashion to a sequence of operators followed by a final \obv. With this we can create a function called \code{from} that takes a \code{Requestable[T]} as its argument and returns an \code{Observable[T]} that emits all elements in the source. + +\begin{lstlisting}[style=InlineScalaStyle] +def from[T](source: Requestable[T]): Observable[T] +\end{lstlisting} + +The code for this function mainly consists of a combination of \Cref{lst:universal-interactive-interface,lst:buffer-feedback-control,lst:buffer-controller} and some glue to make it all work together. Refer to \Cref{app:backpressure-solution} for the full implementation. + +One thing to highlight is that \code{from} not only takes the wrapped source as an input parameter, but also requires the interval at which the feedback system has to run. This is the \code{interval} which is used in \Cref{lst:buffer-feedback-control} \cref{line:interval-in-feedback} to determine when to do a next measurement of the throughput in the feedback system. + +\begin{lstlisting}[style=InlineScalaStyle] +def from[T](source: Requestable[T], interval: Duration): Observable[T] +\end{lstlisting} + +As one can imagine, this greatly influences the speed at which the elements are being emitted. If a source can emit elements immediately and the interval is set to 1 second, it will take much longer for all elements to be emitted than when it is set 1 millisecond. Of course the speed will ultimately be determined by how fast the downstream can consume any element. Given the discussion above on when a new feedback cycle is initiated by measuring the throughput, we can conclude that if \code{interval} is set too fast, it will not influence the performance of the system as a whole, as it will skip the intervals at which the downstream did not show any successful interaction with the queue. If, however, the interval is set at a slower rate than it takes for the downstream to drain the buffer, this will for obvious reasons negatively influence the performance of the system. One could propose to let \code{interval} be dynamically controlled using another feedback system. However, for now we decided to define it as a constant value in the system and make this part of future research. + +With this we have created an API that creates an \obs from an interactive source while taking the possibility of overflow into account. Note that we did not have to change or add anything to the Rx interface. Instead we moved the overflow protection to the top of the operator chain, to the point where the source is polled directly. Now, when an \obv subscribes to the wrapped cold source, the feedback system will pull from the source on behalf of the \obv and push it as a reactive stream to the \obv. From 22cf6dce9d7b25ee6155bc3b5d23178cacf9b90e Mon Sep 17 00:00:00 2001 From: Richard van Heest Date: Fri, 21 Oct 2016 16:23:36 +0200 Subject: [PATCH 4/6] removed old outline file --- report/Outline.docx | Bin 25174 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 report/Outline.docx diff --git a/report/Outline.docx b/report/Outline.docx deleted file mode 100644 index e5d35f62a011dc820bf33c6ccba94baa169ee03b..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 25174 zcmeFY1CuU6*CpEKY1_7K+qR9LaeWF!YPG*vYprwY71 zRPcQd^zr0=KKRF{{yC=@Zu6pPb*^B2HW82<0=*WQ7aqcC$Lr5vtC5 zq%%?IlQwS%P8l6}0Tjo_LfDt;P`@rg?*4+c+(zCLP)~<>!Y6&VKg%p&rl%@gMBv$a zV}}Sn@sk~S6AasO#N(W*hmgtce8(D(b)%7%f8$0=0aK`crjJQ!vE_vzu_Xqy` zfC4H0FEqwa#BO`|S0n$ArLg~KY~W;O;!Dtw%qw7b`00|3-kxH@PQ^|AlgHC3h1Klf>}8&EUWhs)t(5{m zwxPuCZx7qHXh=n^ahU$Q&i~!1TV2V1i4oCH;y^&vGbnuc8go-{irO%nwFH_qYLCt-N9KU7 zfhB*b{J7 zO?TQ8lK>BnG={-XPu1~T+`I#CJw_qO14Ye8`Aie7k7=AA7 zBs!TeR|EQ8tMor#Igde@$idVPh`+FI39xV4-UWY2i^hwrMlN{jdtag_WF>xtYR_fP zN9isyGpowHf_gE=gi>14GBZBWF$?7_;V;M_Z>1Wh&D^>~7^3q|6ABjSkpXG7F9_;e zS01nFe@)X{$G8(9Kv33My@C)=Y(86U`kS{8>KZ@fLF7kZ6gTZvFQ~)zn2_6t%a#P) zZB}vPz;`vQQ`AosNE#EHtuA+0SKn^gKNS@$9Q9$i*~Tl6yw|e5jZR8!R4bJu`4nqr zG3(liQq%Ql@_c;mCwT4^7rF8H>h|Iw3&)$WE{!UuxBeZ~T4|)6F+v zH*HP^uylDV{`E?*zr8^iJ)`yzdoWU3SaCp-=ti;ODHJt zh>}V5f&rmos9&l-^9+0f^91S!%%|k&=0OQih$(_oKJ{e4qzRsXZ(dGj`ntU_wXgEJ z^3DMiEu-o^qwO_3VOB2+)&%xE+2L#_Z&|jMX(kWJ8zJ5;?lt~DZGAEEa1sb%ThCbd zM&4tGiGIpBuis}L-kVF1J{6uiA}a29bLVe5oh(~!UVJL2n+}tm` z-M6w$w`_Ije0H24tQNzz-PNbHqv;)CQ9=(-E6r87Wa+DeQiucI~ynh|ySblBho|gtVP;OojO*?EGQ%OtlKuPw!T~bqib-Bk8sM zUQb>hJ{Em`S-QNZe024n>pU8ZWcNoj8lUa=6zF3lw``Xs&m0!Z6}C&R9No57ZUVg% z*E8`PF77;?vH8V!CUz$ofB7wZuG;v)dd)U+w&B|?y#5SzEUY~8xSarcFExJ=r{*3E zj)y+N>dFV-4HZWGNEkEUGY;z_yj(zfXZBONI{`mLzkwU)idTfb0tOg!I<2;Kn1r8> zc1n>9HX{*bdKD4Zi3xYI_*b1up3>nJA~O0pqwW3sB`;L> zpX)wwsTdkV+&XURB&%tIB_Y16;W!cH^9nGI_k#wycXW~0q;wMgnT3cOvptDzw-e6s zLqp%guKZQ9^E+=&4`QnR)_RCd6bsRn6|ZbIL{GO{pljW*jbNRt&Ffp*Sgpm%;dqd7 zc@2bDCcuC9q-+#u&Vr))YGE>FeOgTIna@medSxEiER;fS-;N*5sfA`LS)Yt5;&jJp_^HmIO z<;=7QU1c{PA#7~kL7&HNf0~6{{6D|_g$MT)K+E{Qb)^E;TPC|p#)Fa0P)l^B0@Q^M zNbrZ)Gzb!&LwI#SQ;jP81V{IFLYQN6tkdIh-EGUwBWVDYFJ&kmzlbin*qF=^m)P1~ zd%_rl%d_Xb$dg2EP+jO$5SFNZ&oc`ZvuXy!pvcn0{E!)6wgn7zOJGO6!LRF({6?mh zc}F$wT~f?Ac{g2Me9D`kGa(DC)wa@iOiwt8@UO#aXHRA>#00>FbI;S3k-&x^&RJJY zSDl}2Jw5_?+KuH=%ONb}t%z1z%h*J%O2d>^?F%6*wSf?qvGHb3CLa946{9I6F43*V zc>HKE4iU8I>6ch#Q_7}^G+AM;<`W)?4a~}INtk55wkP#QdqSwda-eDr6*O9d_5rAk ze=tl~CLpgEJaVGg-{%pKpXJkm@0JQf`7;Z1L7MFsFAKoXRj3;4hYG9C6{1q;PCemH z=GNO7$@hBhd&PaKL8AXvT_g0qaXN%?=6CIIqmY*TRM&X{Wp<%VvOB$ZPuH4Csv|G7 z&U8HHuK30Z%YzVOI+a=7?(yE#4lrbCOUTA5e1c^Q5+pD$>qPRK8C} zD2+nnT;as`CON9>*Es6vcU=O?vy7xQ$^K@ji;$* z3c&)j!%*YlN41gZ1zXP%ynMtp3`}`r{t|m3*ql)Ui6_*&=edIWJiO) zgzUOr8#ff-;cvY2G`6R55I&?{48uy&{(7Y`Zp$wC2)*n$AZM`mRkP7(hEYfD;}uk+ zX;uJ*6S7Zne!X>~{JcfVhi1V&LY2+u>cnrahE`eedA0)L5E!kTG9oH8PP4|Dc*#`I zjNLZ({vmF~FcT}^K{Dp)Bn<^ZzxSvz@6KAPzO!sY8bP;o zM!IxDWkOF>q^6tLt8@qRcv{+y9x85R%BH+Qyw3JHE5&>ut-X?3#B}s+25acsi_;j5B99GN&;r}eV+k8k+K!Orm2>!GYB*9mw(F(<>{k&QIUs1ocv2vh0!f?K*yLb9NiXve zVj^(&>@}ywqrv7Hg8#}*%XNdNg`EHzL{?i^RbHZ&b`Oc>d(k@YA9wZnzrm=^UKq8o zX>dzxfI=%Rj$u5d$(&|${q@)ALr;`Ytc34bpP^7&RDVA=?t~tppu4yaebXMC+F-9= zBxB}uu7&!h(AiV&xlO?OF3@vaQOtzL`hIB6J&p@jZ=h@U3#9ufM_)BuN0hu1PradM z%}?9&c{-xpTVcNi@t?DG_tN#Q;?BJ;&C0&%2cg2v-jQb{?I>ISIz@lN79n+1oR?~H zu{yeMKtAO*0cs7nwh!KFJ;%g4R+CrRJXw?u(`EVSn=EafX@Dd_h;Ml&iux8o;t$fn zHQXXaJca?U2_foM3X-+^d>G>OruI&7hSG0y@$t+BvIgxS)t6X4`@&iAR|j&#*$<+Y z&lfzl`)bk821&mw4q8Rq4zO$%h0k&zPyv63SC16VNcLi!_ezS0rZiGUrJ=%*Qty-} zfcAF&+P(;LD*0xP5NK-bkqO`QM%zwu%_}o?z>_@=d<#)@1#ijscmiz3CS?RMecMSx zJ|2j2taFAz^TV=5V=t5M!OiYhBBiU&A{|h62s*l!+L3dNuj9HP)^0`J$LZVp&OkAY z7_8&Wic8HjnLDA;####7q>M`ChVxkcA7G2%DqyxmDdtXi=9uC9oB)zC6Q)(hF%2nY zuUgiLDXGN>(XB;IY>ig}i=9-SBd9aQo&#N;4DkWf5hotJJ6Zj}u${UET&q65cQ@Z9 zeOCNFUj=;vx2*XD`7;79Uj$f1Z8q*4U#;s=Uu%N`a7b2uQGv|qyv*hpCWOOgRwHs< zr=R4jI;u{a03(?$dn#d>3G1u8^%6IbvN=~XK4qkj>3cDZ==k_&b}D<&7mAohR()AZ zTbh{^RCk#TAL*t$ovx?HcJoT+Qc$!m9>2RPEJV-TfJ*EPg59XoDj-56ksbHi=0hZF z-vvXd!k97QKfmil&I@tG_Dqx$0m3~bF}W3O^ofdBU(|1vKG_ykvpzeWS=Vqg4lHb~ zgHe8+8WF+V6(#DH5gBr_>|qjD1xnT2A}8FWQ;bp?ljiQ?Cl|*}_zIfDhbGh5INHT= zW7>N>g-qDP={6Zw`=vQViJ~g9a#iBNHqX5NVRb)ODo<+}w4EvvT3+Zb0A$AE&ETJq#HfZla;Xh0+T8`T5 zI~2utlb1DO{ch?%XsJ!(TU9qap69M>Wi4u8Q2;F|!(SX;U`d%2#n17#(~&GdWx~*Lw)k#d`A}cma`dj=Z6Kg?`OS;PWeFSh02XIqYD>F8R;n!eIaw+t zyJmyh#;9~2384c;Uo>Lpq9|^vUzYB$nx~O2&c40EGYAn)pQ_tzEn}X4p_pis8bRC| zvw`3R#g9b;Z8}nG%Kr4zhJt8T6#Him#J&|z1lVcL57>1}#wK$d-^|2za2{7ys|=6; z%d+BJa~Bv)9))#cTNb2}c_;=bJ6N&Pdl$sKMpUw}%QQ;l^09af(>9u}LSv)UjCk*W z+Rq{1U~QP}B${kVHAhe6CySHnq4j@FVO6_dC?JDETR!o+v>_TazMV$LIM3`lHni)B zgfnf{YN2f@LqIOyS|Z2{lA_YLhD>Qv8g-}4t*yuWQ|)QV78QmC`iSxXSsyPB+?gAm zzKa;JEc6#dOkPm|KwhlnE_-=c0^*9rLe`u0l}6Ukh!gj$4YQgaYcY**ilS29?~S#8p$A?bc=#fRoLU5fZ{;Z6Ipx z>994}O$($JA(in_B(~9Nk<6p^8-nKZTmi~J$by-{<$LB9{0PzaujjnwjQ3bM$m$71 zR9;SR^mXkp-O>;L{t*-kC8DMRSMieHyGXDRrPhdy=sCz^-tWgWIx9NLCx8+G#=?qCO$Q+bxf1wM3d z^RgPc4ID<`MO#53WjSM}5DkAYba)MMnD8lhiCe*`km5pFLsY8MQq`5NBT69E*LmHRyS|zn|NvRyp2N zOu%spmxis9>w$Dcg|RCcnf-{k>g6b@j@9%+jH~)&5CQ&@WAOgH#~q!b zfdpGp2l6~uqFEt$)+f{d55kiXh{f!Et42S%C$SY!9siYe0 z1{`bjt4IJ7HA4MqD*Kj!+^109RaDhwU7yBMfh)>!u_i+90s!qr?wLb^fD(*@zYN_< zWTpoN9*ma4`i7j{bG~wcqGe8iOEnJtCN@F<7J=f8^j*>v+p=ml(Iof+&nQ|fef*AU ze^-Of(&UFf#?h7SsLD6}U1eZ?5Zygu2HHcvIbRK(2&He#ee>UH&u^hL=~Zp!smKBW z&4Vo?kYDb3df&HlFIANpvCzOi9omeVLo)xr$iSL{SRs{D$_f&YZj}XkMRBU3-dx4vnVKZa8;C{VKk&g|2ayQk}6uOfA1Z?Xc^9QD6ITdtSM0s zoSQENCDVQo0&FcoGYrPXgstL$mV|4;n`|h=S`;t7JZ$vJlwZh=F*CYDb-g9cv8jBk zI?OW{@Cx$TNch{dPfA`i z^~6d4ilc9}x%tYI>%qgXiAmerF%OpzfOOV@BEf{b?CF(pY{d~U^=1XR$T zmLt|WZZX!xuGwN~5zlfY9Ge?4>i~l?+m4U?A*xiqc+>;5(0$%m-;(jo{w`*sSJtX9 zQNp%*$BV z%~{sQ(F7-vf|aE66_MNfQ;P0tmpDjRM;oQt68Lm7M~U>|_`a(P4RI=vXt>=1szTe` z9UwTa6x|2k0>~z?AFm5sARR0*<()B)taiwtYdXDcw!q7X!s+8;Gbpm)y{u7WE+H*H zA2-c-PGORs+h*XQuEW-lpwqGR>5a-wWIK{DZ&%m}XGAheK0|~o(=il=dN2>zWj!q~ zxN`Tsi%Z#1=TI2+yzOA{?rpEaj9JO)<*LHb!!1GWyv&c679JYb_H2Y~tCKiI^$$Pm ztV-C#l%+ND;fV*f(7VOdk5ZtD^h0^~Cgnu_!+>Q+EjocT0zJpc)nup(+_n!@8!1>F z!48jJ(-BoqbnPyQCaIZfiik#= zW8bLrk*HAch0$-NhqEF*ntHz)%eG=bSBjrY;HfDBFWvi=Q}B!*v@L*=>92aNpF{kn z7td306Ku;3+q;95@}Ev;P4_p4W>J#N*(T-Npfe(aRwml_=qGF8>kW&nN7Xr>t7vf>H_8VsCrj?vnj^*rRL3b$WxkZKo+iOiU{uODnk!)K= ze({VtcrtF@Ov=uK@glTj7Y!>TWSnG&z$zZ$TJfTs-WNI>=)}`X1+?u+k!YOH^aV0a zm3s(cQujA9@IEet?!8#zoD?mHbT?~_$+1p#kQ#hoEY=a{tzDEQ5>Jz>SUFW;ggCBw zFg)u|0R;X5kf+V_74c#i-@9n|StPF46*o83QY4zlO_yb4*N7b?V3&^_DS>2-AJbZo zTPcn^X7Oc^K??GoiJ?Ce4(h_U+mzI;K`mu4oZm~Z<^%>!%AObNP}d>!n`IB!0=?~{ zcVpI8O|soOz%LY8g5({HG>k;ep+wEd4WWimje7DX^xxw0Z=*F3!?o?jX{gAmY`76e zpP@K9z#N`O65I@4H*sBNbWNrbgXmYe){16lO<9E}ycr^mUudVy$8~xyryauBol>0@ z@+-S3zgJ^{<5hz8sBr04I&4`nj`p7@!**9k?V@mZyN3K+$E`q$&RLffOONS2zUEz_ zVgbhl<;8d^KS`HA$nvv{q9Wwkjx*V$ows^Mp7+RQ%g&0Tfg@wk4aT`}$y*`)$Z|nn z{U=u!e|b;HxEYM1_YviCmIY-L%2~V3@^gSFCI(H0Jy*{VPBJxxI%O!*yMM%T)QGWR zjNx2O!JPW@tv&IOU7T+V9Q{&K;T6e}5!V*NWhV2Z)<}7aLROS__fwSxeK|v zk~y+WMrQiTEqrWI#wGyug(ePD-edfG{R#`E@avpJeMn&SMqpbHa7nS|)25x-+WE~ zfZZ1acZ8vumf~;kVMoD#4$!si*m~^AD}>MUe7&)k-2F%U=$Dc%(AvOW1#3vx1&O{> zPC@dwCW84uAI;M{*Un(V^;ZCn6}L9!l7^^l!Ty;*2YHM9=)TnFWrbecD( zy%N)|YfR)TxB>K;=;rFrKVSF%!4o1p=PC$^Yd?Fv9wLZx(t{=}&t-=i{j^T&Kt%2(vN0U1gO5t$Gk zS4oDH+R<`M({BmJWd*W@rMs$M8`z7&M3X*&oh*-Ki9v?L_9(rP=ll zqkr-8lE8N0+9Mi*B$#wBV10mmOj?_DscYy*GRT3Y9e&xgITmqZWRbzMJDt!$AS07K z36G0PG?CMX0>4O@!6l-sn}S@Ms3Qm}<`cL~;Gq$Ql?Guv6`1l5ERL3vb>|^5xx&~6 z*4px8>50!1(BWhn1F~W0+Gtv$byU6~`j>fYSFG7|j;RDd`;DN=wDqERX|7wGNKUnY z*6SX+vrDZj4-Q~IHBMY=*|1}+>6S*A*$6*cn8OWdYSh`&@w$Pvd|crTtZdk`tKoM7 zX#HH_cY8H;YT7+OqcG*foh|X^-FOc9Lw|90U4+&F`_!ZiSWu3KS$SC@f#y*fG#0{I zf=!T8VS!^~9B%411OFkwRA&{kPD%oXCo1ItACZk7yadBoy;i0?O>0&q6%kTVL)ejF zLLH0Qj4nqo;Qa#b@$m9+dAVQMCWyb;=IQ+CIX*_zADO6^eEk@E`FZ^-BA|#KIG6A9 zcS<4uJy21UMJ?~^^Q?_x7gyjDoifk=rC+3h%ArnD$sx3w<~&l&VH;|RxOPG;-`FJ= z*CV7&^(dfDfpG|5H@H2nld8b%H9%vu4Udi|5tHU(za4J`-T^i~SuaVHzXDzD&dNRK z*H`06d&K6D$1X)@Bnm}FjU!A?(z_-*$$@3ZRKd#ukyM26J01bj!NMzgX`GI`C1v{& zXsDgia{uyWBK3_GKR+JsF$nPdpIXL3 zI%a)d_vXCw-vZ^C1iW4R8)khz-}dX`5rhIqHX7G3C2ai zFb(-2jZSR!<`*|ZDdE*P`L*B%Fr6P8XX#c?P+zh!5ZNFWqK>N0A*PStxfG?E+v8;> zpQM;dBpW#}h32k+Y`Jh^6C{$q8~~D^yknCmx=@bK$bj3wp;8?z`^p@#sq#Ue(YDLb z0W)idYVgZv0i1(YCRoExjhBUarj`Q9G?P~UIH7HM=|gyQE*NY7`6I;qX`&rKusxqN z5D-#@c{LdIoa;NSO+`RM%&qhT^aRknGJ@)pSF0~qAk+2Ch$|r2E0g!$>~_j_R(Bcs zUBD!EXFd{9+Huz9$!i}ChUk)6m-rW#@W&d)4*L21v>Jimkr zaZsUUTufC)c+_JGr%w@QwSmpRmAwtH3Eb58fRh4$Svlz={CMxH;hGvsVas7%!N(&O zG7?ZuBs9g?NSf$n$w+PLnQAn}B1u*PcQvzKxk>aL?CgZBz3AZ+L@Mb#C)dJZY)^hW z<+ngv#81;ZYbl^fLz3&BmE>QoYgv4yTsA*rJ*9+N$ zHlTo&g;fG)`UKA7LAXtLZHxdx(Pfz|)znx!Q>Js>;?u;R&SZ@qRJnyRyP8FZe4Fm_ z?850h!qwuDH#^IF^}11;dtn`MHGv#jD&&{M6v)(9=lwhWBgmue}=@%=B<>wgad%Vp-F$ASa`!ovEW0zjP2TwJW|ES&#i^_sOd?T$u~ zd=V1-5l^@_&b^z)Rt>glbN z%j*GXeSd~dJ3GB--wSQBv*G;5*fqqQZ4}I${hQv_#vJf$Yr~lA^)}Jqpl>&NXC>a(ckM5`e_U>A z<88N(O*|>9oqTp+MyF4V%X-#xKFr;j9QD%2dhs^89F^Bw+})2G+CR0}-UXa)iZWZr zob^}_eGXa~s0ZiSheX!ls$1UuTg2)4|A^!F9S@u6)qLb}eyq+$cPZHVzZcz0?8(9C z?czrp6#%WQ9N6Wi&tS0V`kI}=uG~gkTsu9xF^gT~%qTF7hhNb6B3_e45lo8YzsO`z z0h6OXIldhFE)aAM9Ef7#iro_eBwk8MUSlvFG@S~IBcNSHh zeDbP`?a=bbkLdZ;j>f+k3AEx}drdsa$Awhlie%3@}FDaZF(N>z9&;5V}*#i^mpC1`i ztP1eEi&6f)mu3CRhtDQKs%q?_`{=#)w~okjKGznaEpp+BdVPQ^G0t!)Q@42vPhuIS5Ve`lIe48bShBcQh#(^x z)p)kyszn0c0M`^qvv{p_j38jIuw2;bq+V3zsGg1P5Ui2p*CBc%$scKAIHfV_RmM6o zo6LS>hLqu}lB6++T#}9z{Gzl;a%~|zGf;C?YxF*{L?Y<|w&F!FW@?gAl8j>JlSLCe z`^AVM=Wb~Dk7R2G##X|f0MCMqR(N7X-4G6!kO(+l3FsBV*_vosr74 zx)1|{W)$5V;n)@vvnnivSH4@ZH(tGW$F;)<$7670JK7*$RvP}*FJ;AKP2n2DvO|S~ zl~Ftkzy`AwK6x#9gTZujzN$j#=Qodv-ac0qM(61k7Z8_i65ri;MAW~)O!Sc5h~Fp&ubft zGA87bGDn8uQ90_8VFG(*c?`QAWI{1|C#!<#mP^$`_riJEf!s*y-;0VY>&sj)KxMD3 z9{N0lsj!4|rzt%IBafAYe$DX6AH%txWfKOkUz(6O5Ip61e|{aFqZ15dlzK+(7|mmogs8cO$56zml8v)PY}x^ z1#vg*Gp!cZnlOUQ(fL`Jx!}OnOEcH${i{{Ji-916{s)Pfx%VMo@L>sYNx?Zm76Z~F+cLr(iP-KZY)u@dRL(xqg2+5_A4_BU z?&M!QG5Xjc^AO|30Y~|>UZ=2>a8{)cA4d@#B*8KU#4 z-zY$!zW?+@jCs*r1tPbgK7gC&58jeIv&1UfJ5 zBEKTXqj%1l-4?YKezbD`KhYf7b!q6xVd330l(kFCsf*RyGq9-5p8FS$8a`#vc8=5% z?~uY8c^987a4wk{o1!#Giap9@^9!P{Y-#1&m7HlQ zaUZAecg(|In;_lP;f@9_thacZdAfNLe|Bfm48F_nzXkq}3bNDsnknjEh^3YT5D>zD zD@bP-PaCuUvZBj+I`*5~$bJTfKf!zD+u^JYxd17LtzcGL`RExb?<`bE0EbqQ30vb) z>QTvc-R^!uk*QQl7sbJ15xUjl72>9r307B8n z`rg#`%D{QQ6br}SZN@l&&0TJ3@UFqn13@2OZu$C4LMc2eV(BT3J-ep)`upb{<5QZ= zaar8~TlC%$vAnt+1VQr0P--@>%_^jRw|18nBf=ozhaV?5ch)ac7UwEN?ffsgwC-)vrvYGRxP>WY7B>e&$SbacmdKp*T`IGJz)dJYB{2>8i_asv2fAS^8Y z?$POh--7O1Y&Ac?7Mtxg^`+CkCo%VNey^czRN5>=b0|SLWymTS8C~>!oHVetG4@C6 z*2duYfu3Q4X$%g)+XfYJ90-D=A$&80T#3!iFrDu#4Fb&qpM=j{cr*12>luq&3iqE@? z%}H|Imt~6{;+#;H$YJ(F@^LF6dd z9qdrtc%9!>Xk1(3@a@>0VAWJh7IZBg+NKRJ_?C+jIyqTvXNK<9tjch}8Nw_Py(=j! z?A}vFK1d+H<|x>r8A)m(iqF}SN|B$=ct<^$0)-^fvt$5*W zF|eGvMGCc|*c2gk>LV9@T@s36HFGbCvZU0GN^$2{UMcUs=1(KASL?Edyp4%6wg+a> zAOc3*kp2a=uk=(qqv`^u5JdnP@q^jq&-)|EPj8-bmpvMQ<>PHqJQ{>l;cZHMSgzGE zEp>;Ym5MmkRW{t}j~eCBvK!ri<~}mAJsW}x;RH3HR6k3RGO<1H(Y?FS^CGNTqw=t= ziVYM)P>x<8Ql8m7Hek|}3VVwLBVo~01hFyqUyQXt2#%I~Wb#4q8ku4LN+?~?ZCc75 z!-f`wBBl%f*%;P(!SyQH=cS$uhe9T$a<;TIi9JPxm%IRzrb2!P4AtcNY&<(4 zG(AI87+gjP4|wW-!TkRKk2x0tPMydG3Q5m!5Fzm_Uc1fhhNc{asrvW25ltR6VYOsc zkhF#%YR#x_XibmENG3^IR=Z@Obt3!0i2oP&KbZ;vPxoK|i9JXf69=U{-G6l#A5}54 z8(MI5YhH5aEXJF{9we$E=;BCDkZDyuR1s|ssXhixbwK$@u3y;uTy6)9u-jCFFd6B8 z1^?ei?l6wBY}{yz_dDwJ9x2k?}p5E`wRD;O=$ksOfl(AEC{ha`L?Mq&GZ z;r~Co-2bvZ=%@7edAV>Ds^a6XNAfkN9W{NKMqexXed5suPr?>eT-S3Vl@+HSgk-!FPHNb6g_6iV6Nb_Z4!IRzrB zGuoEhf1Uzt*vJG7Zl5K=hiqI{`R)tT^KA}F<{;Z>*3 z8r*SId=!PP zk|Ng3^=2)tk=yTYE^W)9db66A#uilwwV!_3>qj7&bgzk;a=Lpxr*efJ?q%U~qWU%R zZrX+fSc|5OTfW(W$m@KrljyUJ4AILg<@a2S5I$5vWNGTnu;?9S&w6E{@>k zPY1i=;(LkErN^J}RWU0^F0bPdhM{P>8UmDN{Xb_5cm7Dkf)90A1GrtB{`kAR3-4ur zMw?1YlTX(cZyLDsuWaTLTG5J2=Y2VsM61ao&1K}0(@V%%l{$YauoRg3kQmU<2AqoU z`y!8zss_`H%NV4XJg)?QfX}n&(dAoC=uatHOT6BbN}|x!SYDVBo*0i_?@5aI-EVTFJ_}ecq zr0{;SX$1E^$zX>BwnHq$8$D|JSWRK`<^#gQBr>4uSiwEBX!Xx;73vjrH-RKux3KOL z5ri53S~g`hiLSQ)R#x$6L=PR%sK7r3r48q!Ogx=9?j2lpx1Y-Ek%L+6IIe85ewHl_ zsNxGjnVqKsNJv<^8VHiqm1Q=Qpiw+hab?Zk-xw^^zw zLI>ndx5&|vCGy3uM9B&#kIVmwlqDBfJUsYvYz z($wI9HnEn&WJWWT!z4>j?(Ipfn`_c|4k1mOkQ3%^At#l>Ukxd7Z9GNF)I4F2yqeA$ zKLQ!e8h_>{yMErNu{gX~XR<-G)ZB}-1vQTQeK^2Wg!((UQ*wXBDZ>1*KOrlS9)Iou zW3@jCuh8$!Vw?o+&L}rZ;rfGDq--_|Mop~;b`Ee)ZzEkHWDlFmQrg3fY9lmXdw*qJ zw$3CV&VLkz-J{Y2zK|D;Zds#GjiSKgWo3o}xkBeZ2>NBmUfQW5R-8ngPcaEp^p@7@DLf zeGTF3*+=i;s-AIiW8C58o}vZRseEBTX8w+UFdJi@87#gPGTo3Y-b^MOz=cBIw> z-4mfYHtt}VaFOFTm?5+4g$=e2lq0FjD(W0XvTiU*sZy)_9qW4X_ojK4i#&G04 zmLwC7lg~sA6u%qxRl`vCDBG`|iIE8xK&Lqz<7fDlw+K}aO*s`e;#6UK*RPUY@NBDxgSjKmT#3_;@(jDJT= z9ThwL#dR}T!AOA)d9r4rGva7SenZuAD(51okrH(}m$!3)*Zj#$u<~pysc@VkMqJq= zZCIAYmgCK)&6{0;&QJz)oGT*jepKP`spms;x7}Hz%FsHAbC-cgvfB)KV=q7HjRpbX z%} z-5x;u!0>F%8wq>zjun!=JEOM6&|4tmVp0zDjUVJa*mkh|sFfpRnae!!maN+ahhC#m zPP$&xF@yBk)tMx2%JBF(fPZ+mV=>ZR?c83?`RkD41VaOm8dJF)AC|lj>0>%iTA_lt zTnSLu$O5C!k`Rj0;L(`f=8Q3C*)<8zrp&{K{|M&)ZjB*{Cxb4~>)79r^Kxnzb8WVq5bnLd(lE&Q3}%tZ7?(nVY=sT538 z+a1@v5)?{Bj6X^nz$I$L{F2^**%b#tJljN7lO3YT5Omc&vl#M&z-D%P_~A>U01?w{e<*-}erWi10O^GM-0YyM z1@iRs&jtzp_{$kXK=sY~=o>EGR=xW;Q8EGJ*#;cF*O1Zk1X}*!szo3Uzgd3hz`&QD zBQ*veI=+wR9)a8^mF7w;0zUnlVK}A5a}(jMvJ}HCH5H^KnB=ig0!>BS%nk8vrW}DX zr^5nh&?rLRmS5BtCB$ggS#ItG0s&y~XHTI##@W-&nDeFl>Xr+(kJKj1|9cINxe|9F zTHOf73p2l9AjtInCVMO`X90-8rGx3_4P20;pdAEW-tzdg35g&@@qXyXECICeT04cy z+4I@a;X784@~MhtK0!tm6=;kt>$2hlQDQrs^M)VHbhfsW^PC^-4_D``@T>R$&AGT7 zi2|p=0)@B*9Q{;Xg@@858d|wkPtgHMjfSdj(Nk0uWV5~!lgGc}kJf(~i$k>kh{)%n z1;=1Gf-lwp($zuR;J735rGSTt*zIzF-Yj0;(oF zNM{Dy3c>G}Cu(egQo8!;2<~^yVHZ zP8^$GCBdfx`;$hrEL+IanC3qjnJI(FIbx26w<4iYjk*PdiXO$S@^Ie~ zOZ9*;VKH_O{eV$vJ$An!kahignsrU_!9jB0)fl%+LkV%DNiAVSkcKY@Q=&eoAkq+b z*xCnK>fh5Hx-WNn9^{KPssLN7*zA8q_5Vnw{rYc3PrAbj@&}z z?8HLR+hS@lqRlaiI9(KM${KNio^Dy^wvm0XwuosO3jo`|r)=JOY2YV;d|m13uy|Pd zgugK|O(prlDRCoR5ABx6xRsM`xo>t7U?(&sw&P%&WC$N@GRX-w41i0paep+P(;T{E z`vBC&zJb_ZED8LFXgcUGYh)$1p79x6QWc&T zZq*SUu4SC|)+cH)z&lXF{cSpoSs1{Hw~hB1Z407%w*;30!Yd>I(9}#G3$^HWAD%fB zQ8}UywBMrXoQTFP^yw8!6usI+*1qi~Islq;I^C!pVX-r77~&#seaYkh-f(4)69=T~ zr%S+AI4Hl-r3ChgO^`O50s!o&=A7g=QH88|ugw-6303eGQz@i)KS zyseasTT(9Eg9%nKGo=QOcHCdc-U9aE|Ap*-IRaFTf4k|U7`|Cddk?4_l}N1K__vIc zFU*J_65B*cASA-RwBR5RA0e~-Ofq$!6+8i>vP!Cc|3^5)GOVH6C(jw1Jy96t8oANU zZ_t1*WHxA0X1e1vR_h(of`0^RH6hTK zUST$k#cj7FhK5~-BPw2qX5fD9y#y4ZF&zin9d}zQ;t=|U-Aq`!D|Y&wADA5t#$%(l zq+XI6weKBK2F^-0AzRSLineMMFUT6EU(>qjEn9{Z(7myb^p%DDkQfn_Olbono2N8Q z02tZGkyF%+b7Y0w+Q1bCEA0CY(>c4e8HBcciW$GxWQWT(waD>o#K(Woi_#Rt8^vQX zv5QJTLb~kA>5ani%dmkeE$)W<$Om{&7Nc5UT@R?*1vVfp7PO zAw;#mQw!zB*P?prP`duTIqs>hxT^I`C{jx1TqRKmTuReo!m-hr__ z-=7QU|Fta(2b^d>GlN z;DMf|Fom{YXYSL%ACloPi=8X*xNlb0%uP?VLFqN1@#Lf!qb1qa%^kby+IF#z+)>G5 z7x(3#m8VjsF>yp2fm0_Ww~uo*1)x__ZQ9vzBM~pJ`I5mN_zmc;Cu!*5b<8NTiHhy9 zp5ocX^{c*QO(i3+=43AYf#F&XW{huvMsu5GP5w#khrAQJ?bZxrAWvCSJLP#cBbEce zkq)wSNrlUs&0EH3g4D&R6?EL7*BYJHu^bYefkEXbADM)HQ;l{X$CY5tX1 zM3BO|slA>WT3pN`w+!u~Jgmy&03!!+L+kms7d7FKcW1X4d5Mn}D7=`WB{gMLEX-xh zVL0I*fcT;;_;qL?pTa|?vk!xFv=t;UoO*AfF2QIyT3$`M^4g3G@3``dFQP&Me&DwZ zHv9M2Au*k_HGR^UzuLBk);7wD@ME>rX;x$w<0yNDI&~58KKZtVT>cPkYi*q!tf}fO zVw3>>+8_viYqra5>X$?YR~wF_kf9SgJ4%64Oa0MLvKG&kr!3**!r(b=FCn4Cd$}ES z_8JRpofR)CM7oshUi&uzC=$nzckoYV<;i!Mba4}Lj2T?*)#OCYSyt;I3Ce{v&ur!#;nlljvkqZ^_VWh&n$rT)TmgUm;2 zS_;j@!;>LT_aq)*QjKO`gv=&Q8N#79kXs!d@97o@u?mN(72(fF6?&sV+F`~fALgnP zWV%O9O;KaoXx{`QUG3Y+ViYErj<6+>HQ;JPJwI+Ga9__))`w{%%i8={?7FFt#kl22 zHY?7VVtN^KfFnZ!8D;SSj_gmgyEx!96x0e<?>bOA9f|nby*TXw>7g|>E0S%~d zgLD&P8-GzGC11zdEk)7+6lwRkdCS>*paSvwXmZmiz&X|as=K?ZUzu6x#dS!mKWM+v zS8|Bibj)(sV!Jl3uHAyIg+KnIjmy28(IRXqcCHHc@)H(^z``x#N8;{USHr#t9f3Bh ze%~j*(0MH&^93*W&d>+Mx=-SV3@qDUo}YogsRf0yJJwWCK&!wFcpqH~OCXH&2(~n~ zJZ~h*yg|xVSJi8@Cn&k`1crI6YDx>w*%nAMzsjM_O~H7d_j=W(QBR%>8EEkw2oUh4 z>~OxjkVD3C)DE8QD>S>A>g}!IFD>O#>KjVN@$RV9!ajg=3BpKMXMgjF;;gcBe(kf^ zOA^b7Tn+rL3%(q&1&ykVO_s{A)f=)E-R9&Hf>%f{>=_p zrCEM|f=bW{U5-noo`Kl$O;S-ptbaIy32kcBX1t` zWl-$Tgx(LHB?9XU50&YyPI|k_CM@bMoGo{zgSI8kRmXiAVgd{4M%I}tmmMJF;oNp< zSteQ>1uun(qrwJH;zctcilQooTVMMnzmyrGs&))-QFwjx`_HvM81>SoO`tT=DjW<9 zHc&_jsGp+j;^^ecV&dd<8$ttA9Q$AKB;d%sQR{YCU`21oniD4M;Hjylw1t!Ek2Bpe z(Sz%=A5W8~=`oXud!If3A@5A~CtqOUXg{lMg2;yY6@p)CqQDRm*s)ng}EBlWpYoyeWIU3Ms$ zH*uJ<&qsCqZum8+g4uM5E~{^}m9Aaor@1O(K4z?Azw`!8>KDt!Pxx>~bGnzT@5lSe zxt!(IS0i!POw7@`<$AC2e3*X4{tO*d-P>i^g$Oe~+ZgcKR~rUIUUt!gMl>WLRJqAj z73Ht=&YDV+(hsUd)sE2++21=9LBR!+{2HdYRua0RA>e|O5OyS)N@!wmivo&YUskh{ZBwcRWsi(5NV_I}W+x9akck>rQ4y z(EGUEybQW`vDWy(nm!|T-o&Q@${!!IW9?bnmZ?C0rKsh$v4t}**ys13RmKPuXPAS5 zU#@`gv-`iF9M2qG%>Q?G0H?*zS8#%gTrVs3;AezR@gbkrdCwsErP&lhyx1AE9sO$! z{q@$;4yE7D3-&d&h%b{{zioPW{Ge>TVUDyyd$&6*k3zb`Q7+9yS19m6Hk{Ugs_Lcc+vxfQ;I{VlLd+1&4H?yD#|I!`sx*Ugkp_( zns4j1VHF5lv#GL`VG->8tM?pI|9lP*m+NklQ)vbdo;K z5;vFsjT|ozE+AHc1^T=+b|{m{DirY~ zk|?XT)U8KtKqze;@dC8jBGF8CYGjH-^T1U|5j+&<3F9V&;b(gTT)F@KaL|}iosn4>YDkRnCr!0qg^0IdZH4DW-}}tI8!W6a8l=z8 zG_zc!J24s@AnV;BxZ`7CUYo_bwuXr0?jU&_v=|Fd)hp1ZaT59Ddn2mji-AhboIoKO z%TF&KCb-^P(Xm-hsDvnX)bl%1U&~NFMHT<}6bIqP)t7!nVypAfNib+UD|v(?9)UjB zdGaukBVMLZBBE|{JaGK01VL4;>RhM{C)tw?h!V})oFSyCZIO zjHI6<`;{2q70@-)Ms%|oZ~6#4n3Of)$1=I{Q=yI3?mP~SpH|W)l>~)T;}J&ihFSs0 z!sme-b`jkNyC6tQpd=x3-zK+5qrt}85CT_#Sz~KEousqC7)qab+-BTY@1TF{dkB_n956i(*1mS5E8u-W& z3U(fY&bnv|&*mM(q22|G0X4)$eer4*J}exkcCm_?^|Hx5UML0RIy_Vnti<%z76@KV z!u!PWeMlDc1zq8RWx+HwtKZvIz&wyP+(*XwN^$*{`lo@T;#E)bGsiyX7nP{{vbpXix*Uq3 z2Uk&p*>ps@Ia#s}j4F?`vLDgXatB@FBrUw?_WTfA`d~}zx;l!84JdTf|5<(y2DeUD zl=F)u{i`t^YZo1Svp7=X^IFC+(~e*pt|Ylprc^O&WCN|$6J`mLCCIkVndZ9<;o!sW zmz5bJ$l0BJ)mbv@vp>fbIC4$tjP&0T;X>f2k~p6+5y zIxk*_QSl+Rvv9AHbRfi{B-yP*$G$X)ue9AgoFh9h8a>#8Z>J@oyT>;Js@^J5Z;7Fa zUm|LH;!idXW*jT0*<=EVELf|MgrqqY#VsLC&crbqTrT>=*<0L4#8Qu2!W~H-7P(a) z5tkJtj%kQba~(67z3*0me`zO?OshJ35Lm+V&=l5{i55+ncv1B{4>A0OOb_UY6f&3Y z)$yRRtC4&3(?mo9E4v|*8yKliVSqjR>*HGEBn`V%EnUT8Fnmg%sZ^2)nQ1nqOZ$-* zk$rnN&LU&pdW9q-oG|EhXp2rNi_yuWL*rp()XJ54lm0~l){QCJBPz`6??Rn3HBSu| z>QlagmYGioF;Q}OEeT}&1rl1Vk;bT-lyL^vE|8?atXMQ>F{kGu9cvw)=G@`(pjX2S z`nvDmisrisx6<~>vN#Q3DFw50o<4%8^Y{onPU4?qnqYiIl`-M7A zh4V|frRDCRab)4ui7*8zj;|dB)-B=p&iM!|H(Kg~q7Q`Lf-EBrS9vck!#g(&ggd{# zUGVc@XAv?i3k&mO?|2w{WPY*{!Okv0$xNYSIwItZw3MpYa zhq#J76SVbwaZCBsjYP6i2>IQ|wBN7K+vyZTF;RLxdrt>LK~s@gJ5H?YPt@#e5v4m} z!{sS?F_y9Q#h{(?qD09vnAr#P!(}3myLa%?2JjL2<@k++wmJ zOZKOJPu6p*ZlSd4Z?M2eN^Ed3J5G+dXX6`;QJ1Xe;epRnBI1gI&qG-MsZ_4MUT9ga zd{B#6e_s~mIT1b&{P2LdPXGrd?G&QQ;)ve*{l+1^r=jyaPu0XTckR7wjT-lhO|V?u zVXXAMc9R-UAXe@`%XYh^HoEbIPxzw$P|Ry*Qu&(2b9<8Z`v*@eB!7J`%jlB#O^}K< zT=)(G{PW!-MR6-7geB|U*`o}H{gsTs=vmtQS)Z}lC5$oqs8juZ61cA$uHCrdF>wK+ z-s?O8ae@AW`KmzJJKQ)^9<1CYO(Y|hMP=Q`?eEvwW8}Ssh|)RrvyZ)W=^A>B(gW#< z>0qk7aE;1U=KC&)<>@PAr||C%0$N%pz4S zRx={cTj^28+9o>0{f5?AVh7iV$d2X*eL$!>v^3E{cFBVp7=*?I8a1J`+(h8;x<7A2 zEtFZr1~wwJ;~H^MAEh2!*Rt^V@lP7aJy<5-?(^S;?|;ro|M~cjGWg1Je;WK*a{C`8 z&=>?}SHBhDzAOB*4)O28T_7d%|Eo%T*XeFG(_dbPs6U;4uWfo)em4vBm;9XSr~GbC z=v|Au=`_DAju?Mh{GMEMSAI8&_?MiQ^Kbb-F~xVqch|Cii4R5p5Z_tZzU%Yn$$P)6Rn&e{|4n~)<#*@YzYNedf9}K|Gw{3myHl%QdUNfc`a2V? lyAF46=f4~{bbmVhadTIeLjZ;;3=9VFmH|fkh2ibD{{t5 Date: Sat, 22 Oct 2016 11:07:16 +0200 Subject: [PATCH 5/6] conclusion and future work so far --- report/0.1-introduction.tex | 2 +- report/2.1-hot-cold-streams.tex | 2 +- report/6.0-conclusion-and-future-work.tex | 46 +++++++++++++++++++++-- 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/report/0.1-introduction.tex b/report/0.1-introduction.tex index effd5cf..0496320 100644 --- a/report/0.1-introduction.tex +++ b/report/0.1-introduction.tex @@ -19,7 +19,7 @@ \section*{Research Questions} \addcontentsline{toc}{section}{Research Questions} This thesis answers a number of questions that are related to reactive programming, overproducing sources in a reactive context and feedback control, which are listed and briefly introduced in this section. -\subsubsection*{In what ways can a reactive program already be controlled to prevent overproduction?} +\subsubsection*{In which ways can a reactive program already be controlled to prevent overproduction?} There are multiple solutions for controlling overproduction in the context of reactive programming. To understand these solutions, we first need to identify what various types of reactive programs exist and consider their mutual similarities and differences. Based on this we can identify which existing solution for controlling overproduction will or will not work for each particular type of reactive program. This analysis can further be used to create a better understanding of which type of reactive program is suited for the solution proposed in this thesis. \subsubsection*{How can we implement a \emph{reactive} feedback system that is composed of smaller parts?} diff --git a/report/2.1-hot-cold-streams.tex b/report/2.1-hot-cold-streams.tex index be9374a..5f41acb 100644 --- a/report/2.1-hot-cold-streams.tex +++ b/report/2.1-hot-cold-streams.tex @@ -1,7 +1,7 @@ \section{Hot and cold streams} Applying the definition of reactiveness by Benveniste and Berry \cite{berry1991-Reactive} to the Rx \obs, we can conclude that every \obs sequence starts with a source that emits values at its own pace. No matter which function is used for this (\code{apply}, \code{range}, \code{timer}, \code{interval}, etc.), ultimately they all are the result of the \code{Observable.create} function. This function lifts an arbitrary source into the \obs interface and treats its values like streaming data. The behavior that is exposed by the resulting stream can, however, differ from source to source. -Sources like clocks, mouse moves or key presses start emitting regardless of any \obv being subscribed to the stream. When no one is listening, the data is simply discarded; when multiple \obv instances are subscribed, every one of them receives the same data at (approximately) the same time. In case an \obv subscribes at a later time, it will not receive all previously emitted data, but will only share in the data that is send after it is subscribed. This kind of source is considered to be \textit{hot}. It is strictly reactive, meaning that it can only emit data at a speed which is determined by the source and has no way to be slowed down by any \obv that cannot cope with the amount of data sent. +Sources like clocks, stock tickers, mouse moves or key presses start emitting regardless of any \obv being subscribed to the stream. When no one is listening, the data is simply discarded; when multiple \obv instances are subscribed, every one of them receives the same data at (approximately) the same time. In case an \obv subscribes at a later time, it will not receive all previously emitted data, but will only share in the data that is send after it is subscribed. This kind of source is considered to be \textit{hot}. It is strictly reactive, meaning that it can only emit data at a speed which is determined by the source and has no way to be slowed down by any \obv that cannot cope with the amount of data sent. On the other hand there are streams that originate from sources that are actually interactive. These include the results of database queries and \ieb sequences. Often the reason for them being wrapped into an \obs is because of a potential delay that needs to be awaited before the result is returned without blocking the program flow or call stack, or simply because the context of the program requires an \obs rather than an \ieb. Regarding the former, it may (for example in case of lazy evaluation) take some time before the \ier has produced its next element. The \obs will however not start emitting its data immediately, like the hot variant, but will wait until at least one \obv is subscribed. Only then it will start producing its values. In case a second \obv subscribes, the stream will effectively duplicate itself and start all over again with emitting the first values. Unless specified by operator sequences, this means that the lazy \ier in the source will have to produce its values a second time as well. In the end, both the first and second \obv have received the same set of data, even though the second subscribed much later than the first. A stream with this kind of behavior is referred to as a \textit{cold} \obs. diff --git a/report/6.0-conclusion-and-future-work.tex b/report/6.0-conclusion-and-future-work.tex index 554be41..f114062 100644 --- a/report/6.0-conclusion-and-future-work.tex +++ b/report/6.0-conclusion-and-future-work.tex @@ -1,10 +1,50 @@ -\chapter{Conclusion and future work} -\todo{introduction, looking back, personal notes (???)} +\chapter{Discussion} +In this chapter we summarize and conclude the insights of this thesis, structured by the research questions. Also we propose a number of open challenges as future work. \section{Conclusion} -\todo{content here} +\subsection*{In which way can a reactive program already be controlled to prevent overproduction?} +Overproduction occurs in a reactive system whenever the producer (who, according to the definition, is in charge of the emission rate of data) produces more elements than the consumer can handle. In case of a `bursty' system (where the producer emits many elements in a brief period of time and then emits few elements for a long period of time), the consumer can buffer the data and catch up during the `quiet' period. However, in the general case where the emission behavior of a fast producer is unknown, the data cannot be buffered without having the risk of running out of memory. + +There are a couple of ways to prevent overproduction that are currently applied to reactive programs. However, these do not work for all kinds of sources that are wrapped in a reactive interface. We distinguish the following kinds of sources that each have distinct behavior that influences what kinds of overflow production can be applied: + +\begin{description} + \item [hot sources] are reactive by definition: they produce their events/elements at their own speed and without any intervention from an observer; these typically include stock tickers and mouse moves. + \item [cold asynchronous sources] are interactive sources that might take some arbitrary amount of time to produce a next element; these typically include network responses and results from a database query. + \item [cold synchronous sources] are interactive sources that return their next element immediately, without the need to do any computationally heavy work; these typically include in-memory collections such as a list. +\end{description} + +The only solutions that work for a \textit{hot source} are either discarding the data that cannot be processed immediately or grouping it into larger sections and processing it in these groups. + +These solutions do also work for the \textit{cold asynchronous sources}, as they are also bound by a certain notion of time. Besides that, the backpressure technique as proposed by Reactive Streams works for these sources as well. This is due to the fact that these sources allow for the consumer to manage the rate at which the producer is allowed to emit its data. + +Backpressure also works fine on \textit{cold synchronous sources}, although the best way to control their emission rates is already incorporated on programming language level: callstack blocking. Due to the nature of these streams, the next element can only be emitted once the current element is fully processed. Note that this technique is also naturally used in \textit{hot} and \textit{cold asynchronous sources}, but that this can potentially lead to out-of-memory errors. + +\subsection*{How can we implement a \textit{reactive} feedback system that is composed of smaller parts?} +A feedback system can be seen as a mapping from an input stream to an output stream. In terms of control theory, the input stream represents the setpoint, the output stream is the control output that is fed back to be compared with the setpoint's latest value as well as outputted for any other purpose. The same can be said of each individual component of the feedback system: it has an input stream that is transformed into an output stream. This output stream can also be considered the input stream of the next component. + +We derived an API for such a component that builds on top of the RxJava library. We also introduced operators for composing these components in both a sequential and parallel way and related these to the well known structure of \textit{Arrow}s. Finally we introduced a set of operators that makes it possible to turn a sequence of components into a feedback system by feeding back the sequence's output stream to its input stream. The resulting API offers a concise way of describing a feedback system in just a couple of lines of code. + +\subsection*{How can the overproduction problem be reduced to a feedback control problem?} +We proposed a new solution to the overproduction problem that uses feedback control at its core. While Reactive Stream's backpressure implementation in RxJava moves the overflow control up to a certain operator that can be anywhere in the operator sequence, we chose to move it up to the point where the source is drained into the operator sequence. There we chose to place a buffer and control the number of elements that are being requested from the source. This is done by measuring the \textit{throughput} of the buffer, which is equal to the ratio between how many elements are being pulled out from the buffer and the number of element that were in the buffer during a particular time frame. + +With this we have reduced the problem of overproduction to a problem of controlling the size of a buffer. Here the metric (or control output) is the \textit{throughput}, which we bound to be a number between 0.0 and 1.0 (boundaries inclusive). Since the ideal value for the throughput is 1.0 (meaning that all data is consumed directly), we set this as the setpoint of the feedback system that controls the buffer. Using a custom, incremental controller, we transform the tracking error into a new number of elements to be requested from the source. + +\subsection*{Can this new solution to overproduction be integrated into an existing API for reactive programming?} +Using our feedback API we were able to construct this feedback system in a `reactive' way and thereby integrate it with the reactive API. Integrating this new solution into the newly created RxMobile API turned out to be as easy as wrapping it into an \code{Observable.create}. \section{Future work} +Given the solution to overproduction presented in this thesis, we still see room for improvements and alternative solutions. In this section we will lay out some of the work that is still open for further research. Here we will first focus on open work in the field of overflow protection and then describe some interesting problems in the field of feedback control. + +\subsection*{Overproduction} +We already touched on the fact that our solution still uses a fixed constant interval after which the throughput is measured. We acknowledge that an interval is required here, since a continuous measurement does not make sense in the discrete world of computer science. However, it would be interesting to see whether it is possible to make this interval dynamic and control it using another feedback system. For this a mathematical exploration needs to be undertaken to discover what kind of metric would determine the length of such interval, before a feedback system can be derived from it. + +Another point of interest is the new RxJava2.0 library\footnote{this completely new rewrite of RxJava came to pass only at the end of writing this thesis}, in which the \obs is pure again and does not have anything to do with backpressure. Instead a \code{Flowable} type is introduced, which wraps an interactive source in a reactive context by applying backpressure. The policy used here is fully abstracted and replaceable with any other policy. It would therefore be interesting to see whether our solution could be fitted and used as just another of these policies. When implemented in this way, it would also be interesting to do a performance analysis and see how this policy compares to any other available policy. + +In our solution we have considered the consumer to be a single threaded process which can only consume the elements one by one. This put a restriction on the kinds of sources that could be applied to this solution: due to the fact that we request a certain number of elements, a hot source cannot be applied here. An interesting alternative to look into is to control the number of consumers rather than controlling a buffer size. This is similar to various scheduling and allocation algorithms in distributed computing, but in our opinion this can also be applied to the field of reactive programming. Controlling the number of consumers in this context can either mean a single consumer with a multi threaded process, which can consume multiple elements at a time or literally a variable number of single threaded consumers that work in parallel of each other. We hypothesize that this solution would be suitable not only for cold sources but for hot sources alike. + +\subsection*{Feedback control} + + \todo{discuss the potentials of future work as listed below} \begin{itemize} \item Backpressure From 192b6cd897cd34fa3ba0e9a3f3ee406ee68f1e45 Mon Sep 17 00:00:00 2001 From: Richard van Heest Date: Sat, 22 Oct 2016 14:08:27 +0200 Subject: [PATCH 6/6] the rest of the conclusion/future work --- report/0.1-introduction.tex | 4 ++-- report/6.0-conclusion-and-future-work.tex | 28 ++++------------------- report/report.tex | 2 +- 3 files changed, 8 insertions(+), 26 deletions(-) diff --git a/report/0.1-introduction.tex b/report/0.1-introduction.tex index 0496320..6486d19 100644 --- a/report/0.1-introduction.tex +++ b/report/0.1-introduction.tex @@ -28,8 +28,8 @@ \subsubsection*{How can we implement a \emph{reactive} feedback system that is c \subsubsection*{How can the overproduction problem be reduced to a feedback control problem?} In order to solve the problem of overproduction using feedback control, it is necessary to get out of the context of reactive programming and abstract this into more formal problems that can be solved using the principles control theory. Based on the feedback system that solves this control problem, a mapping can be created to the original problem of controlling overproduction in a reactive program. -\subsubsection*{Can this new solution to overproduction be integrated into an existing API for reactive programming?} -In order for this solution to be useful in practice, it is important to be able to integrate it with existing API's for reactive programming. To validate this, a clean API is needed that does not yet implement any solutions for overproduction by default. For this purpose we use the newly created RxMobile API \cite{RxMobile}. +%\subsubsection*{Can this new solution to overproduction be integrated into an existing API for reactive programming?} +%In order for this solution to be useful in practice, it is important to be able to integrate it with existing API's for reactive programming. To validate this, a clean API is needed that does not yet implement any solutions for overproduction by default. For this purpose we use the newly created RxMobile API \cite{RxMobile}. \section*{Outline} \addcontentsline{toc}{section}{Outline} diff --git a/report/6.0-conclusion-and-future-work.tex b/report/6.0-conclusion-and-future-work.tex index f114062..de177fa 100644 --- a/report/6.0-conclusion-and-future-work.tex +++ b/report/6.0-conclusion-and-future-work.tex @@ -29,8 +29,8 @@ \subsection*{How can the overproduction problem be reduced to a feedback control With this we have reduced the problem of overproduction to a problem of controlling the size of a buffer. Here the metric (or control output) is the \textit{throughput}, which we bound to be a number between 0.0 and 1.0 (boundaries inclusive). Since the ideal value for the throughput is 1.0 (meaning that all data is consumed directly), we set this as the setpoint of the feedback system that controls the buffer. Using a custom, incremental controller, we transform the tracking error into a new number of elements to be requested from the source. -\subsection*{Can this new solution to overproduction be integrated into an existing API for reactive programming?} -Using our feedback API we were able to construct this feedback system in a `reactive' way and thereby integrate it with the reactive API. Integrating this new solution into the newly created RxMobile API turned out to be as easy as wrapping it into an \code{Observable.create}. +%\subsection*{Can this new solution to overproduction be integrated into an existing API for reactive programming?} +%Using our feedback API we were able to construct this feedback system in a `reactive' way and thereby integrate it with the reactive API. Integrating this new solution into the newly created RxMobile API turned out to be as easy as wrapping it into an \code{Observable.create}. \section{Future work} Given the solution to overproduction presented in this thesis, we still see room for improvements and alternative solutions. In this section we will lay out some of the work that is still open for further research. Here we will first focus on open work in the field of overflow protection and then describe some interesting problems in the field of feedback control. @@ -43,26 +43,8 @@ \subsection*{Overproduction} In our solution we have considered the consumer to be a single threaded process which can only consume the elements one by one. This put a restriction on the kinds of sources that could be applied to this solution: due to the fact that we request a certain number of elements, a hot source cannot be applied here. An interesting alternative to look into is to control the number of consumers rather than controlling a buffer size. This is similar to various scheduling and allocation algorithms in distributed computing, but in our opinion this can also be applied to the field of reactive programming. Controlling the number of consumers in this context can either mean a single consumer with a multi threaded process, which can consume multiple elements at a time or literally a variable number of single threaded consumers that work in parallel of each other. We hypothesize that this solution would be suitable not only for cold sources but for hot sources alike. \subsection*{Feedback control} +While researching the field of feedback control, it was very surprising to see that this technique was hardly ever used in computer science. Despite it being well known and often used in all kinds of fields of science, it was hard to find any related work in our field. A possible explanation for this might be the highly involved mathematical notions that formally describe a feedback system, which are hard or impossible to be applied to computer science. More work needs to be done to formalize this technique for this field. A particular distinction to be focused on is the continuous nature of a mathematical feedback system and discrete nature that is required in computer science. +On the same note, feedback control provides a simple but powerful solution to a wide range of problems. However, the problems we have seen during our research in this field all look alike. This makes us hypothesize that a formal classification can be made of problems that can be solved using feedback control. Just like some problems can be classified as $P$, $NP$ or $NPC$, we hypothesize that a similar thing can be done for a class of problems $FC$ that can be solved using feedback control. Metrics about complexity of runtime and space would be equally interesting to formalize for this class, such that comparisons can be made of the performance of different feedback systems. -\todo{discuss the potentials of future work as listed below} -\begin{itemize} - \item Backpressure - \begin{itemize} - \item Push solution - control number of workers with certain metrics. This will presumably also work with hot streams such as UI events and time - \begin{itemize} - \item Queue length - \item Net queue length change - \item In/out ratio per time unit - \end{itemize} - \item Using RxJava2.x, you can apparently define a `backpressure policy' on the new \code{Flowable} type - \item \code{interval} in our feedback system; control it using feedback rather than having it as a constant in \code{from} - \end{itemize} - \item Feedback control - \begin{itemize} - \item It’s basically an algorithm to solve ‘control problems’. How is the problem class defined where feedback control can be used as an easy solution? - \item It’s difficult to tune PID controllers. (refer to blog) Would it be possible to use ML for this process? - \item The PID controller originally comes from the field of physics and mechanical/electrical engineering and is considered to be the default controller, given its (mathematical) simplicity and quick response to change. Does this still hold for computer science, since we don’t need/use mathematical models? Or is there an alternative, easier to tune, controller that could be considered as default in computer science? (also take into account that the PID controller makes the assumption of a \textit{continuous} world, whereas computer science presumes a \textit{discrete} world) - \item We derived a feedback API based on a transformation of $\obs \Rightarrow \obs$. Would it be possible to also do make a similar API by just adding the feedback operator to the \obs API? This would allow for some form of recursion within the stream, allowing for a whole new class of problems to be solved using reactive programming. - \end{itemize} -\end{itemize} +Another interesting part of control theory that requires more research is based on the controller. For feedback systems in physics and engineering, who all assume a \textit{continuous} world in which is feedback system is ran, the PID controller is the standard controller to use, due to its simplicity en accuracy. However, this controller is not particularly well-suited for systems that run in a \textit{discrete} world, as is the case in computer science. We hypothesize that a similar, but discrete, controller should exist for these systems. diff --git a/report/report.tex b/report/report.tex index 090a2da..54ecd4a 100644 --- a/report/report.tex +++ b/report/report.tex @@ -31,7 +31,7 @@ \crefname{appsec}{Appendix}{Appendices} -\newcommand{\todo}[1]{\textcolor{red}{\textbf{\large @TODO: #1}}} +%\newcommand{\todo}[1]{\textcolor{red}{\textbf{\large @TODO: #1}}} \newcommand{\code}[1]{\sloppy{\mbox{\texttt{#1}}}} \newcommand{\HRule}{\rule{\textwidth}{0.5mm}}