Skip to content

LBHackney-IT/mtfh-reporting-data-listener

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

MTFH Reporting Data Listener

This listener implements an AWS Lambda Function that recieves messages when any updates are made to the TenureInformationApi and sends the updates to a Kafka queue.

Here is the process on how the data is saved to Kafka:

Retrieving data from the API

  1. When one of the events in this switch statement is raised then this listener is triggered
  2. The listener can then call the relevant API to retrieve the data the needs to be saved. You could potentially use this Shared NuGet Package to make calls to any APIs.This ReadME explains how the listener calls the relative API.
    • If the Tenure doesn't exist then the listener throws an Exception

Get Schema from schema registry

  1. The listener will then get the schema from a schema registry via a http call, the schema registry is managed in this repository.

Convert data to Avro

  1. Then using this schema, an AVRO generic record is created holding the tenure details retrieved from the tenure API.

    Kafka only accepts the following data types; byte[], Bytebuffer, Double, Integer, Long, String. If you need to send through other data types you will first need to serialize the data into one of these types. Below are some code examples of how we have done this.

    Nullable types (Union):

    Here we check whether the field value is null, assign null if it is. This needs to be done as Kafka doesn't accept empty strings. If not, remove the nullable part from the schema and continue as normal.

         if (fieldValue == null)
         {
             record.Add(field.Name, null);
             return;
         }
    
         fieldSchema = GetNonNullablePartOfNullableSchema(field.Schema);
         fieldType = fieldSchema.Tag;
    
         ...
    
         private Schema GetNonNullablePartOfNullableSchema(Schema nullableSchema)
         {
             var jsonSchema = (JsonElement) JsonSerializer.Deserialize<object>(nullableSchema.ToString());
             jsonSchema.TryGetProperty("type", out var unionList);
             var notNullSchema = unionList.EnumerateArray().First(type => type.ToString() != "null").ToString();
             return Schema.Parse(notNullSchema);
         }

    Enums:

         new GenericEnum((EnumSchema) fieldSchema, fieldValue.ToString())

    DateTime Objects:

    Converts to a int holding a Unix timestamp.

        private int? UnixTimestampNullable(object obj)
        {
            var date = (DateTime?) obj;
            return (int?) (date?.Subtract(new DateTime(1970, 1, 1)))?.TotalSeconds;
        }

    Arrays:

        var fieldValueAsList = (List<ExampleList>) fieldValue;
        var itemsSchema = GetSchemaForArrayItems(fieldSchema);
        var recordsList = fieldValueAsList.Select(listItem => PopulateFields(listItem, itemsSchema)).ToArray()
        record.Add(field.Name, recordsList);
        
        private Schema GetSchemaForArrayItems(Schema arraySchema)
        {
            var jsonSchema = (JsonElement) JsonSerializer.Deserialize<object>(arraySchema.ToString());
            jsonSchema.TryGetProperty("items", out var itemsSchemaJson);
            return Schema.Parse(itemsSchemaJson.ToString());
        }

    Once the data types have been converted you can create the Generic Record by adding each value to the record, this can be done through the use of a for loop & reflection:

        public GenericRecord PopulateFields(object item, Schema schema)
        {
            var record = new GenericRecord((RecordSchema) schema);
            ((RecordSchema) schema).Fields.ForEach(field =>
            {
                PropertyInfo propInfo = item.GetType().GetProperty(field.Name);
                if (propInfo == null)
                {
                    Console.WriteLine($"Field name: {field.Name} not found in {item}");
                    return;
                }
    
                var fieldValue = propInfo.GetValue(item);
                var fieldSchema = field.Schema;
                var fieldType = field.Schema.Tag;
    
                //Add any data type conversion required here
    
                record.Add(field.Name, fieldValue);
            });
    
            return record;
        }
  2. Lastly the record created above is then sent through to Kafka.

    Schema Registry

    A schema registry URL is required to push data to Kafka in AVRO format. The schema registry is hosted separately from the Kafka brokers. The Kafka producer publishes the data to a Kafka topic, using the schema registry to serialize the data into AVRO per the schema relating to the topic. Hence the schema registry is used to store the schemas for a topic. You can store a schema for both the key and value of a message. We only send values in AVRO format.

    Send Data to Kafka

    Messages are sent to Kafka using Kafka Topics which the consumer subsequently reads from. A Kafka Topic is created for the producer from its schema name if one doesn't already exist, otherwise it uses the schema name provided as an environment variable. This is set in Parameter Store in the producer's AWS account. We then use the Producer.Flush(TimeSpan.FromSeconds(10)) to immediately send through the message through Kafka to the consumer. There is a 10 seconds maximum timeout on the listener. This would mean that it will wait until either all the messages have been sent or 10 seconds have passed. If a maximum timeout is not set then it would wait until all the messages have been sent, and in the event there is an issue whilst sending messages, it will likely hang until the lambda timeout which we don't want.

    Kafka Images

    In order to ensure high availability and improve fault tolerance of the event streaming process, we deploy multiple brokers and set a replication factor for Kafka topics to a value that is greater than 1. This means that copies (replicas) of the data will be spread across the brokers so that in the event one of the brokers goes down/ fails, then another broker can take over and serve the request.

See this diagram for a visual representation of the process of the listener.

Stack

  • .NET Core as a web framework.
  • xUnit as a test framework.

Contributing

Setup

  1. Install Docker.
  2. Install AWS CLI.
  3. Clone this repository.
  4. Rename the initial template.
  5. Open it in your IDE.

Development

To serve the application, run it using your IDE of choice, we use Visual Studio CE and JetBrains Rider on Mac.

Note When running locally the appropriate database conneciton details are still needed.

The application can also be served locally using docker:

  1. Add you security credentials to AWS CLI.
$ aws configure
  1. Log into AWS ECR.
$ aws ecr get-login --no-include-email
  1. Build and serve the application. It will be available in the port 3000.
$ make build && make serve

Release process

We use a pull request workflow, where changes are made on a branch and approved by one or more other maintainers before the developer can merge into master branch.

Circle CI Workflow Example

Then we have an automated six step deployment process, which runs in CircleCI.

  1. Automated tests (xUnit) are run to ensure the release is of good quality.
  2. The application is deployed to development automatically, where we check our latest changes work well.
  3. We manually confirm a staging deployment in the CircleCI workflow once we're happy with our changes in development.
  4. The application is deployed to staging.
  5. We manually confirm a production deployment in the CircleCI workflow once we're happy with our changes in staging.
  6. The application is deployed to production.

Our staging and production environments are hosted by AWS. We would deploy to production per each feature/config merged into release branch.

Creating A PR

To help with making changes to code easier to understand when being reviewed, we've added a PR template. When a new PR is created on a repo that uses this API template, the PR template will automatically fill in the Open a pull request description textbox. The PR author can edit and change the PR description using the template as a guide.

Static Code Analysis

FxCop runs code analysis when the Solution is built.

Both the API and Test projects have been set up to treat all warnings from the code analysis as errors and therefore, fail the build.

However, we can select which errors to suppress by setting the severity of the responsible rule to none, e.g dotnet_analyzer_diagnostic.<Category-or-RuleId>.severity = none, within the .editorconfig file. Documentation on how to do this can be found here.

Testing

Run the tests

$ make test

Agreed Testing Approach

  • Use xUnit, FluentAssertions and Moq
  • Always follow a TDD approach
  • Tests should be independent of each other
  • Gateway tests should interact with a real test instance of the database
  • Test coverage should never go down. (See the test project readme for how to run a coverage check.)
  • All use cases should be covered by E2E tests
  • Optimise when test run speed starts to hinder development
  • Unit tests and E2E tests should run in CI
  • Test database schemas should match up with production database schema
  • Have integration tests which test from the DynamoDb database to API Gateway

Data Migrations

A good data migration

  • Record failure logs
  • Automated
  • Reliable
  • As close to real time as possible
  • Observable monitoring in place
  • Should not affect any existing databases

Contacts

Active Maintainers