Since not every computation is or can be expressed as a linear processing stage pipeline, Akka Streams also provide a graph-resembling DSL for building stream processing graphs, in which each node can has multiple inputs and outputs.
The documentation refers to graph operation as junctions, in which multiple flows are connected at a single point, enabling to perform any kind of fan-in or fan-out.
The Flow graph APIs provide a pretty straight forward abstraction:
The APIs already provide some of the most useful juctions, like the following:
Merge[In]
- (N inputs , 1 output) picks randomly from inputs pushing them one by one to its outputZip[A,B]
– (2 inputs, 1 output) is a ZipWith specialised to zipping input streams of A and B into an (A,B) tuple streamConcat[A]
– (2 inputs, 1 output) concatenates two streams (first consume one, then the second one)Merge[In]
– (N inputs , 1 output) picks randomly from inputs pushing them one by one to its outputBroadcast[T]
– (1 input, N outputs) given an input element emits to each outputThe documentation also provide a simple but brilliant example that illustrates how the DSL provided by the library can be used to express graph computation keeping a great level of declarativeness and code readability.
The following image shows a graph that express a computation in which:
The corresponding computation can be implemented as follows:
val g = FlowGraph.closed() { implicit builder: FlowGraph.Builder[Unit] =>
import FlowGraph.Implicits._
val in = Source(1 to 10)
val out = Sink.ignore
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
val f1, f2, f3, f4 = Flow[Int].map(_ + 10)
in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
bcast ~> f4 ~> merge
}
When building and connecting each component, the compiler will check for type correctness and this is a really usefull things. The check to control whether or not all elements have been properly connected is done at run-time, though.
The framework also provide the notion of partial graph. A partial graph is a graph with undefined sources, sinks or both, and it's useful to structure the code in different components, that will be then connected with other components. In other words, the usage of partial graphs favours code composability.
In many cases it's also possible to expose a complex graph as a simpler structure, such as a Source, Sink or Flow, since these concepts can be viewed as special cases of a partially connected graph:
One last feature that this section will depict and that Akka Stream supports is the possibility to insert cycles in flow graphs. This feature is potentially dangerous, since it may lead to deadlock or liveness issues.
The problems quickly arise when there're unbalanced feedback loops in the graph. Since Akka Stream is based on processing items in a bounded manner, if a cycle has an unbounded number of items (for example, when items always get reinjected in the cycle), the back-pressure will deadlock the graph very quickly.
A possible strategy to avoid deadlocks in presence of unbalanced cycles is introducing a dropping element on the feedback arc, that will drop items when back-pressure begins to act.
A brilliant example from the documentation is the following, where a buffer( )
is used with a 10 items capacity and a dropHead
strategy.
FlowGraph.closed() { implicit b =>
import FlowGraph.Implicits._
val merge = b.add(Merge[Int](2))
val bcast = b.add(Broadcast[Int](2))
 source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore
merge <~ Flow[Int].buffer(10, OverflowStrategy.dropHead) <~ bcast
}
An alternative approach in solving the problem is by building a cycle that is balanced from the beginning, by using junctions that balance the inputs. Thus, the previous example can also be solved in the following manner, with:
ZipWith( )
junction, that will balance the feedback loop with the sourceConcat( )
combined with another Source( )
with an initial element that performs an initial "kick-off". Infact, using a balancing operator to balance a feedback loops require an initial element in the feedback loop, otherwise we fall in the "chicken-and-egg" problem.FlowGraph.closed() { implicit b =>
import FlowGraph.Implicits._
val zip = b.add(ZipWith((left: Int, right: Int) => left))
val bcast = b.add(Broadcast[Int](2))
val concat = b.add(Concat[Int]())
val start = Source.single(0)
source ~> zip.in0
zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore
zip.in1 <~ concat <~ start
concat <~ bcast
}