Skip to content

Latest commit

 

History

History
166 lines (123 loc) · 13.8 KB

processor.adoc

File metadata and controls

166 lines (123 loc) · 13.8 KB

The Processor

This chapter is my favorite one. Not because it’s going to be the most useful, but because it’s the one that took the longest for me to figure out. You see, I love backoffice code. I love messaging, and integration. I love analytics. I love workflow. I love stream and batch processing. I love workflow engines and business process management. I love grid computing. I love short-lived tasks like CRON jobs. I love all the sorts of stuff that has no business being anywhere near an HTTP request or taking up space on an HTTP webservice. We used to call these sorts of things "backoffice" jobs. And you’ve probably built some of these in your career, too.

And I didn’t know how OAuth fit in this world of backoffice code. I mean, just look at the Spring portfolio! We’ve got Spring Integration, Spring AMQP, Spring for Apache Kafka, Spring Cloud Data Flow, Spring Cloud Task, Spring Batch, and even Spring Shell. You can build all sorts of cool stuff with those libraries and never see an HTTP header! So how does OAuth fit here? And, more precisely, how does Spring Security’s OAuth support plugin? We’ve already met the usual suspects: spring-boot-starter-oauth-client, spring-boot-starter-resource-server, etc. But those all assume the presence of an HTTP server and the Spring Security web filter chain.

In this chapter we’re going to expand our sample application a bit with some Spring Integration code that will receive a message that the API (the resource server) will send. Each message will contain the JWT token associated with the authenticated user and it’ll contain a payload tat the processor will.., you know, process. In this example, we’ll imagine that this processor is busy doing the work of sending emails. We’re not going to actually write that bit of the code. But it is a good example. After all, sending email can sometimes take a long time and we don’t want to keep the API busy doing this when it should be fielding HTTP requests.

When the message arrives at this processor module, we’ll validate the JWT token by talking to the Spring Authorization Server (our OAuth IDP) through its issuer URI. Does this sort of sound familiar? It should! We’re going to basically do the same trick as the resource server did, albeit a bit more granuarly. Well get to see how the resource server support does some of its work. Its good to know this because, outside of an HTTP environment, there’s no one-sized fits all approach. It’s convenient then that we can easily plug this stuff in ourselves.

(Re-) Introducing Spring Integration

Let’s talk again about Spring Integration. we looked at it ever so briefly when we introduced the API, but let’s review some basics.

Remember this code from the API we built earlier?

link:api/src/main/java/bootiful/api/EmailRequestsIntegrationFlowConfiguration.java[role=include]

It’s a bean of type IntegrationFlow, from the Spring Integration project. The bean describes how we handle messages intended for a RabbitMQ broker: messages come, then we turn it into JSON, then we send it over AMQP. Simple.

Spring Integration is an old project, from 2007. The core conceit fo Spring Integration is that as we move forward in time the body of systems and services with which we need to integrate - to maximally retain value - grows, and the protocols and paradigms required for integration also grow. Spring Integration is an enterprise application technnology. It’s designed to help glue systems together, particularly those things that wouldn’t otherwise know about and work with each other.

In 2004, Gregor Hohpe and Bobby Woolf wrote the book Enterprise Integration Patterns, which gave us the names for the patterns typical of integration solutions. Broadly, the book said, there are four different kinds of integration styles.

  • RPC: in this style, network services are made to work like a local object. In such a style, a client could invoke a method on a local Java object and have that translated into remote procedure calls on another object on another object, presumably running on another host. This style feels simple but it hides the reality of the network - that it will fail. It makes assumptions that it shouldn’t, namely that the service will always be available. If the service - the consumer - is not available then has to retry or abandon the integration. Because of these design tradeoffs, RPC is a poor choice for service integration.

  • Shared Database: in this style, a client connects the same database (e.g.: Oracle, PostgreSQL, MongoDB, etc), writes data there that another client then reads. This is fragile because a schema change by one client might break another client. This approach completely violates the principles of encapsulation, exposing the peculiarities data storage to consumers, and is therefore a poor choice for service integration. There’s a reason nobody ever says "put your best liver forward!" It’s nobody’s business what your liver looks like! They should be shaking your hand, instead. The same is true in integration: don’t share too much.

  • File synchronization: in this style, a client deposits a file on a filesystem (NFS, FTP, FTPS, SFTP, SMB, etc.) that a client then consumes. This approach works alright but care must be taken so that the consumer of a file doesn’t start processing it before the producer has finished writing it. Additionally, this approach lacks any sort of sophistication around message delivery (once and only once, transactions, message rollback) or routing.

  • Messaging: in this style, integration is done in terms of a messaging system like Apache Kafka or RabbitMQ. This is usually the best approach if you can get access to it. Messaging systems support transactions, they can be made to ensure a message is delivered, that it is delivered at most once, or at least once, etc. It can hanndle routing, allowing you to send the message to different consumers as required. And of course it does not couple producer or consumer; a consumer can send a message to the messaging system, and it’ll be recorded there. When the consumer is available and able, it can read and process the message. We can say that this style of integration is the most decoupled: producers and consumers don’t need to both be available at the same time; producers and consumers only see and agree upon message payloads, and not the internals of their state management schemes.

Generally, speaking, messaging is the most flexible approach to building and integrating systems. So it is that Spring Integration models everything as Message objects that pass through MessageChannel objects. A MessageChannel is the connective tissue between components that act on Message objects, in a sort of pipeline. These components are written in terms of the `Message`s they accept and the `Message`s they produce. They’re otherwise usually stateless. In a way, Spring Integration encourages a lot of the same discipline and conventions as any functional programming language might. You’re encouraged to write your business logic in terms of granular, composable, and reusable functions.

Where do these messages come from, and where do they go? Well, the real world of course! They have to come from somewhere. It is Spring Integration’s job to connect our code to events in the real world and translate them into Message objects: a microwave turned on (MQTT), a new file appeared in an FTP service, a new row appeared in a SQL database, a new email was sent to an inbox, a message arrived on a JMS destination, etc. It does this work with adapters which adapt events into Message objects. Each Message typically contains a payload (the inbound file adapter might contain a java.io.File payload, for example) and headers telling us about the payload (the folder in which the file was found, or the timestamp of when the message was produced).

Once we have a Message ,we can do all sorts of things to it. We could split it into smaller messages, route it to other handlers, add information to it, filter it, etc.

And then when we’re finally done with it, we use an outbound adapter - which does the reverse of an inbound adapter -to send the message onward to some place in the real world.

So, to review the review: events come via inbound adapters, they’re turned into Message objects with headers and payloads. In this form they’re processed, and ultimately sent out via outbound adapters.

Now back to our regularly scheduled programming.

Defining RabbitMQ Infrastructure

First thing’s first: we’re using RabbitMQ. In RabbitMQ, you send messages to an exchange which then can route it anyway it wants. In our case, it’s going to be sent to a single queue, which is where consumers will know to look for it. So, we have an exchange and a queue, and they’re bound together through something called a binding. We’ll use the Spring AMQP project to make short work of defining these things. If we define them as beans, Spring AMQP will automaticall create the real structures on the RabbitMQ broker.

link:processor/src/main/java/bootiful/processor/AmqpConfiguration.java[role=include]

You’ll notice that this code uses a constant variable I’ve defined. There are a few others…​

A Few Well Known Constants

There are a few things that I’ve defined as constants: a header name, a MessageChannel bean ID, and the RabbitMQ destination.

link:processor/src/main/java/bootiful/processor/Constants.java[role=include]

The Integration

Now we’re into the meat of the example, the actual Spring Integration code.

In Spring Integration, an IntegrationFlow is the definition of a processing pipeline. You may have more than one IntegrationFlow in your application. IntegrationFlow objects chain together different components that act on the Message objects within the flow. You can route messages from one IntegrationFlow to another using MessageChannel instances.

Conceptually, all we want to do is take a message from the inbound AMQP (that’s the protocol that RabbitMQ speaks) in and then print out the message. One might send an email or do something useful here, but we’ll leave that as an exercise for another day…​ And yet, I’ve written it a little more obtusely: we have one IntegrationFlow that takes the message rom AMQP and then stuffs the message into a MessageChannel. It pops out the other side and then we print out the message’s payload and headers. Why the indirection? Why add the MessageChannel into the mix, I hear you query. MessageChannel objects can have interceptors that can, in effect, veto messages. So we’ll configure some interceptors to act on the message, validate the attached JWT token, and if it doesn’t match, to reject the message before it gets to whatever important business logic we’ve got downstream of thee MessageChannel.

Let’s see it all in action.

link:processor/src/main/java/bootiful/processor/IntegrationConfiguration.java[role=include]
  1. the first IntegrationFlow defines an AMQP Inbound adapter which listens for new Messages on our auto configured RabbitMQ connection. As soon as a message comes in, it gets sent to the next step in the INtegrationFlow. IN this case, that next step is to travel through an injected MessageChannel to another IntegrationFlow.

  2. the second IntegrationFlow takes whatevers been stuffed into the MessageChannel and passews it to the next step, which is a simple handler that inspects the message payload and headers.

  3. both IntegrationFlow objects are connected by the MessageChannel whoe bean ID is Constants.REQUESTS_MESSAGE_CHANNEL, and whose definition we see here. It’s a bit complicated but this is where we do the work of validating the JWT token.

  4. most of the important woerk is done in these interceptors. Each interceptr can inspect, transform, or reject the Message objects flowign through the MessageChannel on which they’re configured. I wrote this first interceptor. We’ll look at it shortyl, but suffice it to say that it in turn is using the injected JwtAuthenticationProvider that is provided by Spring Security and whose configuration we’ll examine momentarily to do the work of validating the JWT token against the Spring Authorization Server. If the token is valid, the interceptor creates a new message with a valid Spring Security Authentication in the header. If the token is not valid, then the interceptor will create a new message with a null value for the header.

  5. this next interceptor hoists the Authentication from the message header and puts it into the Spring Security SecurityContextHolder, which is a well-known ThreadLocal-like holder of the current thread’s authenticated user. Now, all the other machinery in Spring Security that consults this thread local will work correctly.

  6. the final interceptor does the actual check, rejecting the message if the current thread doesn’t have a valid, authenticated Authentication.

The JwtAuthenticationInterceptor leaves nothing to the imagination:

link:processor/src/main/java/bootiful/processor/JwtAuthenticationInterceptor.java[role=include]
  1. We’ve defined this bean elsewhere and injected it here

  2. in which header shall we place the authentication once we’ve validated the token?

  3. extract the token from the header

  4. use the AuthenticationProvider to talk to the Spring Authorizatoin Server.

  5. if the JWT is in fact valid then we’ll create a new Message, cloning the headers and payload from the incoming message, but specufying one new header, whose value is an object of type Authentication.

  6. if the JWT is not valid, then create a new Message with a null header value.