Default Jackson serde in Spring

Problem

We’re using Spring in most of our Java applications, and rely on Swagger for a lot of API calls. Now, having all the jackson annotations makes everything easy – until it becomes annoying, like this:

    @JsonFormat(pattern = ModelConst.JSON_DATE_FORMAT)
    private LocalDate startDate;
    @JsonFormat(pattern = ModelConst.JSON_DATE_FORMAT)
    private LocalDate endDate;
    @JsonFormat(pattern = ModelConst.JSON_DATE_FORMAT)
    private LocalDate paymentDate; 
    @JsonFormat(pattern = ModelConst.JSON_DATE_FORMAT)
    private LocalDate registrationDate; 

Now, having one of these is nice and tidy. Having them all over the model code is annoying, to say the least.

So, what do you do?

Well, with Jackson you can add custom serializers to the ObjectMapper object. Problem is, we’re using spring, so we don’t really have access to the ObjectMapper object used to do the serialization/deserialization.

The first option was to generate a bean to generate the ObjectMapper , and assign the serializers to it. However, there’s a better way – We can use a configuration bean supplied by Spring:

@Configuration
public class AppConfig {
 
    private static final String dateFormat = "yyyy-MM-dd";
    private static final String dateTimeFormat = "yyyy-MM-dd HH:mm:ss";
 
    @Bean
    public Jackson2ObjectMapperBuilderCustomizer jsonCustomizer() {
        return builder -> {
            builder.serializers(new LocalDateTimeSerializer(
                DateTimeFormatter.ofPattern(DATE_TIME_FORMAT)));
            builder.serializers(new LocalDateSerializer(
                DateTimeFormatter.ofPattern(DATE_FORMAT)));
        };
    }

}

This way, we don’t need any of the annotations, since we’ve set the default parsing.

    private LocalDate startDate;
    private LocalDate endDate;
    private LocalDate paymentDate; 
    private LocalDate registrationDate; 

Some kafka retention lessons (we learned the hard way)

We’re using Kafka (2.0 on cluster, Java client 1.1) as our messaging backbone. A different cluster is deployed in each environment, and we recently started seeing some weird behaviour in one of our user acceptance environments.

First Problem: Old messages are suddenly reprocessed.

Once in every few version releases, we suddently saw some services re-processing old messages. After a lot of head banging and tedious head scratching, we found out that in Kafka, the retention on the offsets and the retention on the messages is not necessarily the same.

What does it mean? Well, when a messsage is sent to a specific Kafka topic, it’s retained as long as the topic retention. So, if our topic retention is 1 week, then after 1 week it will no longer be available.

The consumer offset, however, is a different story. The consumers offsets are saved in an internal topic called __consumer_offsets, and their retention time is defined in the parameter offsets.retention.minutes in the broker config with a deafult of 24 hours.

So what happened to us is this: Our messages retention was set to 2 weeks, and the offsets retention was 24 hours. After a period of not using the system, we deployed a new version. Once the new version was up, it queried the Kafka topic for it’s latest offset. However, the consumer_offset of this application id was already deleted, and the default behaviour is to read from the begining of the stream – which is exactly what happened to us: This is why we were consuming old messages when we released new versions, and it would only happen if we released versions after more than 24 hours and less than 2 weeks.

Second Problem: The producer attempted to use a producer id which is not currently assigned to its transactional id

This one was even more annoying. We’re using Kafka streams api, which promises an exactly-once message processing. Every once in a while, we’d get the above error after the message has been proccessed. This would cause the Kafka stream to shut down, the app to restart – and then, to process the same message again(!).

Now, this was extremly weird. First of all, it was a violation of our "exactly-once" constraint. In addition, we had no idea what it means!

Lately we started also seeing what seems to be a related error: org.apache.kafka.common.errors.UnknownProducerIdException: Found no record of producerId=16003 on the broker. It is possible that the last message with the producerId=16003 has been removed due to hitting the retention limit.

So we started thinking – what is happening to our producer ids?

Then we discovered this:

transactional.id.expiration.ms

The maximum amount of time in ms that the transaction coordinator will wait before proactively expire a producer’s transactional ID without receiving any transaction status updates from it.

Type: int

Default: 604800000

604800000 ms is 7 days. So basically, if we’ve had a streaming application that had no traffic for 7 days, it’s producer metadata was deleted – and that’s the behaviour we’ve been seeing: the application consumed the message, processed it – and when it tried to commit the transaction and update the offset, it failed. This is why we processed, crushed, and re-processed.

Bottom line

Kafka is a tool built for massive data streaming, and it’s defaults are organized around it. Both these issued occured because this specific environment’s usage pattern is random and is not corresponding with the cdefault configuration.

AWS Certified Solutions Architect – Associate

I’ve been doing a Udemy course as a preparation for the AWS Certified Solutions Architect – Associate. These are my summary notes

AWS Certified Solutions Architect – Associate

Exam

  • 130 minutes
  • 60 questions
  • Results are between 100 – 1000, pass: 720
  • Scenario based qestions

IAM

  • Users
  • Groups
  • Roles
  • Policis

Users are part of Groups Resources have Roles : i.e, for an instance to connect to S3, it needs to have a role All the User groups and Roles get their permissions are through Policies, which are defined by json:

# God mode policy
{
    "Version":"2019-01-01",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "*",
            "Resource": "*"
        }
    ]
}

(Creating policies is not part of the exam)

General

  • IAM is cross-regional
  • "root account" is the account created with the first setup of the AWS account, and has complete Admin access.
  • New users have no permissions until assigned
  • New users are assinged Access Key Id and Secret Access Keys when created, for the api access.

S3

  • Key – Value Object based, with metadata and versioning
  • Has access control lists
  • Max 5TB file size
  • Buckets are universal namespace (https://s3-{region}.amazonaws.com/{bucket})

Consistency model:

  • Read After Write consistency – object will be availabe for readdirectly after being written
  • Eventual consistency for overwrite PUTS and DELETES

Storage Tiers/Classes

  • S3 Standatrd – 99.99% availability, 99.999999999% durability, cross-devices, cross-facilities redundancy, designed to sustain loss of 2 facilities at the same time.
  • S3 – IA (infrequently access) – for data accessed less frequently. Lower storage fee, but has a retrieval fee. S3 One Zone – IA : the same as IA, only in 1 AZ. (cheaper)
  • Glacier: Very cheap, archival only. Standard retrieval time takes 3 – 5 hours.

Cross Region Replication (CRR)

  • Requires versioning enaled on the source bucket

CloudFront

  • Edge Location – the location the content will be cached: Per AWS Region (They are not read only, you can write to them too, and they will replicate to the origin and from there to others)
  • Clearing cache cost money 🙂
  • Origin – The original file location: S3 bucket, EC2 instance, ELB, or Route53
  • Distribution – all the locations of the Edges you defined
  • Can distribute dynamic, static, streaming and interactive content (Web Distribution: most common, for websites; RTMP – media streaming)

EC2

Placment groups

Two types:

  1. Cluster placment group – A group of instances within a single AZ that need low latency / high throughput (i.e cassandra cluster). Only available for specific types.
  2. Spread placment group – A group of instances that need to be place seperatly from each other
  • Placment group name myst be unique within aws account
  • Only available for certain instance types
  • Recommended to use homogenous instances within placment group
  • You can’t merge placment groups
  • You can’t move an exisitng instance to a placment group, only create it into it

EFS

  • Supports NFSv4
  • Only pay for used storage
  • Scales up to petabytes
  • Support thousands of concurrent NFS connections
  • Data is stored across multiple AZ within region

Route 53

DNS overview

  • NS – Name Server record. Meaning, if I go to helloRoute53gurus.com, and I’m the first one to try it in my ISP, then the ISP server will ask the .com if it has NS recored for helloRoute53gurus. The .com will have a record that maps it to ns.awsdns.com. So it’ll go to ns.awsdns.com , which will direct it to Route53..\
  • A – short for Address – that’s the most basic record, and it’s the IP for the url
  • TTL – time to live – how long to keep in cache
  • CNAME – resolve one domain to another (can’t be used for ‘naked’ domain names, e.g: ‘www.google.com’ )
  • Alias – unique to Route53, the may resource records to Elastic Load Balancer, CloudFron, or S3 bucket that are configured as websites. They work like CNAME (www.example.com -> elb1234.elb.amazonaws.com)
  • MX record – email records
  • PTR Records – reverse lookups

ELB do not have predefined IPv4 addresses, you resolve them using a dns name. So basically, if you have the domain "example.com" and you want to direct it’s traffic to an ELB, you need to use an Alias (not a cname, because it’s a naked domain name!, and not an A record because it has no IP)

Routing Policies

  • Simple Routing – 1 record with multiple ips addresses, randomly returned. No health checks
  • Weighted Routing – 1 record with N% goes to one rcecord, and M% to another and so forth
  • Latency Based Routing – Route 53 will send to the region with the lowest latency
  • Failover Routing – Health check based Primary/Secondary routing: if the primary instance fails (health check = false), directs to the secondary
  • Geolocation Routing – config which geo location goes to which instance
  • Multivalue Answer Routing – Several records, each with ip addresses, and health check for each resource. The ips will return randomlly, so it’s good for disparsing traffic to different resources.

VPC

  • NAT Gateways – scaled up to 10G, no need to patch/add security groups/assign ip (automatic), they do need to be updates in the routing table (so they can go out via igw)
  • Network ACL –
    • It’s like a SG, in the subnet level.
    • Each subnet is associated with one, but default it’s blocking all in/out bound traffic. you can associate multiple subnets to the same ACL, but only 1 ACL per subnet.
    • The traffic rules are evaluated from the lowest value and up.
    • Unlike SG, opening port 80 for incoming will not allow outbound response on port 80. If you want to communicate on port 80, you’d have to define rule both for inbound and outbound. (Otherwise, it’ll go in and not out)
    • You can block IP addresses using ACL, you can’t with SG
  • ALB – you need at least 2 public subnets for an Application Load Balancer

Application Services

SQS

  • Distributed Pull based Messaging queue
  • Up to 256 Kb messages
  • Default retention: 4 days, max 14 days
  • Default promisese "at-least-once", "FIFO" promises exactly once with ordering
  • Can poll with timeout (like kafka)
  • Visibility – once message is consumed, it’s marked as "invisible" for 30 seconds (default, max is 12 hours), and if it’s not marked as "read" within that time frame, it returns to be visible and re-distributed to another consumer.

SWF – Simple Workflow Service

  • Kind on amazon ETL system, with Workers (who process jobs) and Deciders (who control the flow of jobs). The system enables dispatching of jobs to multiple workers (which makes it easily scalable), tracking the jobs status, and so forth.
  • SWF keeps track of all the tasks and events in an application (in SQS you’d have to do it manually)
  • Unlike SQS, In SWF a task is assigned only once and never duplicated (What happens if the job fails? IDK).
  • SWF enables you to incorportae human interaction – like, if someone needs to approve received messages, for example

SNS – Simple Notifications Services

Delivers notification too:

  • Push notifications

  • SMS

  • Email

  • SQS queue

  • Any http endpoint

  • Lamda functions

  • Messages are hosted in multiple AZ for redundancy

Messages are agregated by Topics, and recipients can dynamically subscribe to Topics.

Elastic Transcoder

  • Convert video files between formats – like formatiing video files to different formats for portable devices

API Gateway

Basically a front API for your lamda/internal APIs, with amazon capabilities:

  • API Caching – caching responses to an request api with TTL
  • Throttling requests to prevent attacks

Kinesis

3 types:

  • Streams – Kafka (Retention : up to 7 days) – Shards = partitions (?)
  • Firehose – Fully automated, no consumers, No retention, No shards – Can be written to s3 / elastic
  • Analytics – run SQL queries on the streams/firehose streams, and write the result to s3 /elsastic

Basic Lambda to call internal VPC api

Product team decided they wanted a specific event to happen every time a specific email address receives an email. The first option was to poll mail server and analyse all the received emails (yuck!).

The other option was to use AWS tools. In our case, forward the email to SES and call a lambda that will trigger our internal service with all the email meta-data – Which is exactly what we did.

Few pointers before the code:

  1. Lambda doesn’t load node dpendencies – so if you want to use some external packages except aws, you’d need to upload a zip with the dependencies and your code
  2. If you want to call an internal VPC resource, you need to:
    1. Assign the lambda to your VPC
    2. Assign the lambda a security group that will enable it to work from within the vpc

After you’ve assigned the Lambda to the VPC and setup the SG, the rest is easy:

const axios = require('axios'); //Loaded in the zip file!

console.log(`Url: http://${process.env.SERVICE_URL}`)

exports.handler = (event, context, callback) => {
    console.log(`Inside lambda: ${JSON.stringify(event)}` )
    
    axios.post(`http://${process.env.SERVICE_URL}`, event)
        .then((res) => {
            console.log(res);
            callback(null, res);
        })
        .catch((error) => {
            console.error(error);
            callback(error);
        });
        
    console.log(`Logging out`)    
};

You can find all the event types here: Sample Events

The SES -> Lambda invocation only send the Email’s meta-data. If you want to have the email content, you’d need to use SNS (so SES -> SNS Topic -> Lambda), but bear in mind that SNS only supports emails up to 150K, so for anything larger, you’d need to move to S3

Postgres – Logging queries

We had an issue with some JQL queries returning weird results from the db, so we wanted to see exactly what’s arriving to the psql service. To see that:

  1. Edit the config file: /var/lib/postgresql/data/postgresql.conf

  2. Unmark and change the following:

#logging_collector = off                # Enable capturing of stderr and csvlog
                                        # into log files. Required to be on for
                                        # csvlogs.
                                        # (change requires restart)

# These are only used if logging_collector is on:
#log_directory = 'pg_log'               # directory where log files are written,
                                        # can be absolute or relative to PGDATA
#log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'        # log file name pattern,
                                        # can include strftime() escapes
[...]
log_statement = 'none'                   # none, ddl, mod, all
  • The logging_collector should be set to on to enable logging
  • The log_statement should be set to all to enable query logging
  • The log_directory and log_filename can stay the same, depends on what you want.

So your line should look like:

#logging_collector = on                # Enable capturing of stderr and csvlog
                                        # into log files. Required to be on for
                                        # csvlogs.
                                        # (change requires restart)

# These are only used if logging_collector is on:
#log_directory = 'pg_log'               # directory where log files are written,
                                        # can be absolute or relative to PGDATA
#log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'        # log file name pattern,
                                        # can include strftime() escapes
[...]
log_statement = 'all'                   # none, ddl, mod, all

Now restart your service, and you’re good to go : the logs will be at /var/lib/postgresql/data/pg_log

Don’t run this on production, as it will seriously fuck up your performance!

How to handle runtime exceptions in Kafka Streams

We’ve been using Kafka Streams (1.1.0, Java) as the backbone of our μ-services architecture. We’ve switched to stream mainly because we wanted the exactly-once processing guarantee.

Lately we’ve been having several runtime exceptions that killed the entire stream library and our μ-service.

So the main question was – is this the way to go? After a few back and forth, we realized that the best way to test this is by checking:

  1. What would Kafka do?
  2. Do we still keep our exactly-once promise?

What does Kafka do?

This document is the Kafka Stream Architecture design. After the description of the StreamThread , StreamTask and StandbyTask, there’s a discussion about Exceptions handling, the gist of which is as follows:

First, we can distinguish between recoverable and fatal exceptions. Recoverable exception should be handled internally and never bubble out to the user. For fatal exceptions, Kafka Streams is doomed to fail and cannot start/continue to process data. […] We should never try to handle any fatal exceptions but clean up and shutdown

So, if Kafka threw a Throwable at us, it basically means that the library is doomed to fail, and won’t be able to process data. In our case, since the entire app is built around Kafka, this means killing the entire μ-service, and letting the deployment mechanism just re-deploy another one automatically.

Do we still keep our exactly-once promise?

Now we’re faced with the question whether or not this behaviour might harm our hard-earned exactly-once guarantee.
To answer that question, we first need to understand when the exactly-once is applicable.

exactly-once is applicable from the moment we’re inside the stream – meaning, our message arrived at the first topic, T1. So everything that happens before that is irrelevant: the producer who pushed the message to T1 in the first time could have failed just before sending it, and the message will never arrive (so not even at-least-once is valid) – so this is something we probably need to handle, but that doesn’t have anything to do with streams.

Now, let’s say our message, M, is already inside topic T1. Hooray!

Now we can either fail before reading it, while processing it, and after pushing it.

  • If we failed before reading it, we’re fine. The μ-service will go up again, will use the same appId, and we’ll read the message.
  • If we read it and failed before we even started processing it, we’ll never send the offset commit, so again, we’re fine.
  • If we failed during processing it, again, we’ll never reach the point of updating the offsets (because we commit the processed message together with the consumer offset – so if one didn’t happen, neither did the other)
  • If we failed after sending it – again, we’re fine: even if we didn’t get the ack, both the consumer offset and the new transformed/processed message are out.

Uncaught Exception Handlers

Kafka has 2 (non-overlapping) ways to handle uncaught exceptions:

  • KafkaStreams::setUncaughtExceptionHandler – this function allows you to register an uncaught exception handler, but it will not prevent the stream from dying: it’s only there to allow you to add behaviour to your app in case such an exception indeed happens. This is a good way to inform the rest of your app that it needs to shut itself down / send message somewhere.

  • ProductionExceptionHandler – You can implement this class and add it via the properties: StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG – but in this case, you will need to decide if the stream can keep going or not, and I feel this requires very deep understanding of the internals of the streams, and I’m still not sure when exactly you would want that.

Conclusion

For us, using k8s deployments, with n number of pods of each service being automatically scaled all the time, the best way to handle runtime/unchecked exceptions is to make sure our app goes down with the Kafka Stream library (using the KafkaStreams::setUncaughtExceptionHandler), and letting the deployment service take care of starting the app again.