New Book Day: Kubernetes Secrets Handbook

Since 2015 when Kubernetes was released to the public there was continuous adoption from engineers and a huge progress in terms of tooling and features.  Kubernetes is the most popular container orchestration platform and this is due to various reasons:

  • It’s open source
  • Container based
  • It has a vibrant community
  • A reach ecosystem of extension and tools
  • Easiness of deployments and automation
  • Robustness
  • Scalability

A very important aspect on Kubernetes is secret management. You see when you get started with Kubernetes everything seems to work magically but then you start to wonder on security aspects.
Once you store and fetch a secret using the kubectl command several questions comes to mind.

  • Where is this secret stored
  • Is it encrypted
  • What are the minimum permissions to interact with the secrets
  • What happens on a datacenter outage
  • How safe I am on a disaster recovery scenario
  • What if I want to use the secret with non Kubernetes deployments
  • How my CI/CD interacts with secrets
  • How can I track any interaction with the secrets
  • How about integrating with my Cloud Platform
  • Am I limited to the etcd storage

Secrets management on Kubernetes is a huge topic by itself. For this reason Rom Adams Chen Xi and I embarked on the journey of authoring this book. Our goal was to make it easier for the Kubernetes users to identify the landscape around secrets management and also assist them in the technical choices they will have to make.

The book starts with an overview of Kubernetes, its architecture and design principles and how its components like etcd contribute secret storage. We focus on the different types of secrets and their applications on the various components of Kubernetes, for example the integration of a TLS secret with an Ingress. Another aspect tackled is securing the secrets using RBAC policies, by following the principle of least privilege. Then we focus on tracking down any interactions with secrets through Kubernetes Auditing.

Following the book focuses on encrypting the secrets the Kubernetes Native Way. The reader will learn on the default encryption providers that Kubernetes offers, cbc and gcm, and how Kubernetes can be configured to enable the encryption of secrets on etcd. Later we focus on hardening the system where the secrets reside physically. Following there is a section on troubleshooting secret provisioning issues and common mistakes to avoid.

We also focus on more advanced concepts. We expand on security and compliance and how to address the security concerns at the people, process, and technology levels. We expand on Disaster Recovery and Backups. Backup strategies to employ, tools that we can use and Disaster Recovery plans for Kubernetes. As we proceed we expand more on the security risks that come with secret management, the challenges that we have to tackle on different phases of secret management and the mitigation strategies for security risks.

The last part is fully focused on external secret providers. We focus on the ways that is feasible to use an external secrets providers such as secret injection or the utilisation of the Secrets Store CSI Driver.

We take a deep dive on Cloud Providers such as AWS, Azure, GCP and their secret storage offerings. We get to deploy Kubernetes clusters to the cloud and integrate them with the available secret stores. We focus on disaster recovery capabilities and the resiliency offered in these solutions. Furthermore we focus on observability, monitoring and auditing of secrets in the cloud. We also make sure that we follow the permission of least privilege, and provide fine grained IAM policies. Apart from focusing on the usage of external secret providers we will also examine the usage of the Key Management Systems (KMS) provided from the cloud providers and how we can integrate them with our Kubernetes installation in order to encrypt secrets.

Following we focus on external solutions such as HashiCorp Vault and Conjur. We examine how they work behind the scenes, how they ensure the security of the secrets as well as other important topics such as resiliency, logging, monitoring and disaster recovery. We examine their integration with Kubernetes and how they help us when it comes to secrets management.

Finally we wrap up on cases studies of secret management, CI/CD practises and discuss the future of Kubernetes Secrets Management.

I am really proud of this book and I believe it gives lots of value to the reader. It is a great source of information on Kubernetes Secrets but also it provides a very hands on experience.

You can find the book on Amazon as well as on the Packt portal.

Happy reading!!!

 

 

 

Debezium in Embedded mode

In a previous blog we setup a Debezium server reading events from a from a PostgresQL database. Then we streamed those changes to a Redis instance through a Redis stream.

We might get the impression that in order to run Debezium we need to have two extra components running in our infrastructure:

  • A standalone Debezium server instance
  • A software component with streaming capabilities and various integrations, such as Redis or Kafka

This is not always the case since Debezium can run in embedded mode. By running in embedded mode you use Debezium in order to read directly from a database’s transaction log. It is up to you how you are gonna handle the entries retrieved. The process reading the entries from the transaction log can reside on any Java application thus there is no need for a standalone deployment.

Apart from the number of components reduced, the other benefit is that we can alter the entries as we read them from the database and take action in our application. Sometimes we might just need a subset of the capabilities offered.

Let’s use the same PotsgreSQL configurations we used previously

listen_addresses = '*'
port = 5432
max_connections = 20
shared_buffers = 128MB
temp_buffers = 8MB
work_mem = 4MB
wal_level = logical
max_wal_senders = 3

Also we shall create an initialization script for the table we want to focus

#!/bin/bash
set -e
 
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
  create schema test_schema;
  create table test_schema.employee(
          id  SERIAL PRIMARY KEY,
          firstname   TEXT    NOT NULL,
          lastname    TEXT    NOT NULL,
          email       TEXT    not null,
          age         INT     NOT NULL,
          salary         real,
          unique(email)
      );
EOSQL

Our Docker Compose file will look like this

version: '3.1'
 
services:

  postgres:
    image: postgres
    restart: always
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    volumes:
      - ./postgresql.conf:/etc/postgresql/postgresql.conf
      - ./init:/docker-entrypoint-initdb.d
    command:
      - "-c"
      - "config_file=/etc/postgresql/postgresql.conf"
    ports:
      - 5432:5432

The configuration files we created are mounted to the PostgreSQL Docker container. Docker Compose V2 is out there with many good features, you can find more about it on the book I authored:
A Developer’s Essential Guide to Docker Compose
.

Provided we run docker compose up, a postgresql server with a schema and a table will be up and running. Also that server will have logical decoding enabled and Debezium shall be able to track changes on that table through the transaction log.
We have everything needed to proceed on building our application.

First let’s add the dependencies needed:

 
    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <version.debezium>2.3.1.Final</version.debezium>
        <logback-core.version>1.4.12</logback-core.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-api</artifactId>
            <version>${version.debezium}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-embedded</artifactId>
            <version>${version.debezium}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-postgres</artifactId>
            <version>${version.debezium}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-storage-jdbc</artifactId>
            <version>${version.debezium}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback-core.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>${logback-core.version}</version>
        </dependency>
    </dependencies>

We also need to create the Debezium embedded properties:

name=embedded-debezium-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.flush.interval.ms=60000
database.hostname=127.0.0.1
database.port=5432
database.user=postgres
database.password=postgres
database.dbname=postgres
database.server.name==embedded-debezium
debezium.source.plugin.name=pgoutput
plugin.name=pgoutput
database.server.id=1234
topic.prefix=embedded-debezium
schema.include.list=test_schema
table.include.list=test_schema.employee

Apart from establishing the connection towards the PostgresQL Database we also decided to store the offset in a file. By using the offset in Debezium we keep track of the progress we do on processing the events.

On each change that happens on the table test_schema.employee we shall receive an event. Once we receive that event our codebase should handle it.
To handle the events we need to create a DebeziumEngine.ChangeConsumer. The ChangeConsumer will consume the events emitted.

package com.egkatzioura;

import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

public class CustomChangeConsumer implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> {

    @Override
    public void handleBatch(List<RecordChangeEvent<SourceRecord>> records, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException {
        for(RecordChangeEvent<SourceRecord> record: records) {
            System.out.println(record.record().toString());
        }
    }

}

Every incoming event will be printed on the console.

Now we can add our main class where we setup the engine.

package com.egkatzioura;

import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.ChangeEventFormat;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class Application {

    public static void main(String[] args) throws IOException {
        Properties properties = new Properties();

        try(final InputStream stream = Application.class.getClassLoader().getResourceAsStream("embedded_debezium.properties")) {
            properties.load(stream);
        }
        properties.put("offset.storage.file.filename",new File("offset.dat").getAbsolutePath());

        var engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
                .using(properties)
                .notifying(new CustomChangeConsumer())
                .build();
        engine.run();

    }

}

Provided our application is running as well as the PostgresQL database we configured previously, we can start inserting data

docker exec -it debezium-embedded-postgres-1 psql postgres postgres
psql (15.3 (Debian 15.3-1.pgdg120+1))
Type "help" for help.

postgres=# insert into test_schema.employee (firstname,lastname,email,age,salary) values ('John','Doe 1','john1@doe.com',18,1234.23);

Also we can see the change on the console

SourceRecord{sourcePartition={server=embedded-debezium}, sourceOffset={last_snapshot_record=true, lsn=22518160, txId=743, ts_usec=1705916606794160, snapshot=true}} ConnectRecord{topic='embedded-debezium.test_schema.employee', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{embedded-debezium.test_schema.employee.Key:STRUCT}, value=Struct{after=Struct{id=1,firstname=John,lastname=Doe 1,email=john1@doe.com,age=18,salary=1234.23},source=Struct{version=2.3.1.Final,connector=postgresql,name=embedded-debezium,ts_ms=1705916606794,snapshot=last,db=postgres,sequence=[null,"22518160"],schema=test_schema,table=employee,txId=743,lsn=22518160},op=r,ts_ms=1705916606890}, valueSchema=Schema{embedded-debezium.test_schema.employee.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

We did it. We managed to run Debezium through a Java application without the need of a standalone Debezium server running or a streaming component. You can find the code on GitHub.

Avro Schema Generate and Use

If you implement streaming pipelines, chances are that you use Apache Avro.
Since Avro is a popular choice for serializing data it is widely supported by streaming tools and vendors. Also schema registries are available in order to help with the schema evolution.

Let’s run a basic Avro example.

It all starts with creating the schema on an avsc file. The goal would be to send request metrics for an http endpoint.

{
  "namespace": "com.egkatzioura.avro.model",
  "name": "RequestMetric",
  "type" : "record",
  "fields" : [
    {
      "name": "endpoint",
      "type" : ["null","string"],
      "default": null

    },
    {
      "name" : "status",
      "type" : ["null","int"],
      "default": null
    },
    {
      "name" : "error_message",
      "type" : ["null", "string"],
      "default": null
    },
    {
      "name" : "created_at",
      "type": "long",
      "logicalType" : "timestamp-millis"
    }
  ]
}

If the fields in a record are nullable we need to specify so in the schema ["null", "string"]. Also we want to sent timestamp thus we shall use a logicalType. A logicalType can be a complex or a primitive type, in our case it is a long. By using the attribute logicalType we provide additional semantic meaning to that type.

We will create the directory src/main/avro and place the file under the name request_metric.avsc.

Provided we use maven in order to generate the class files we need to have certain plugins included.


    <dependencies>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.11.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.11.1</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

As we can see we specified where the schemas are placed within the project by using the sourceDirectory configuration. By using the outputDirectory configuration we specify where the generated classes will be placed.

By running on maven mvn generate-sources the class RequestMetric will be generated.

Let’s create and read an avro file.

import com.egkatzioura.avro.model.RequestMetric;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;

import java.io.File;
import java.io.IOException;


public class Application {

    public static void main(String[] args) throws IOException {
        RequestMetric a = new RequestMetric();
        a.setEndpoint("/a");
        a.setStatus(200);
        a.setCreatedAt(System.currentTimeMillis());

        RequestMetric b = new RequestMetric();
        b.setEndpoint("/b");
        b.setStatus(201);
        b.setCreatedAt(System.currentTimeMillis());

        File file = new File("metric.avro");

        SpecificDatumWriter<RequestMetric> datumWriter = new SpecificDatumWriter<>(RequestMetric.class);

        try(DataFileWriter<RequestMetric> dataFileWriter = new DataFileWriter<>(datumWriter)) {
            dataFileWriter.create(a.getSchema(), file);
            dataFileWriter.append(a);
            dataFileWriter.append(b);
        }

        DatumReader<RequestMetric> datumReader = new SpecificDatumReader<>(RequestMetric.class);
        DataFileReader<RequestMetric> dataFileReader = new DataFileReader<>(file, datumReader);
        RequestMetric requestMetric= null;
        while (dataFileReader.hasNext()) {
            requestMetric = dataFileReader.next(requestMetric);
            System.out.println(requestMetric);
        }

    }
}

We did write the Avro file and also we read from it. We don’t have to serialize our data into a file we can also do so in memory.

package com.egkatzioura.avro;

import com.egkatzioura.avro.model.RequestMetric;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.*;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;

import java.io.*;


public class InMemoryExample {

    public static void main(String[] args) throws IOException {
        RequestMetric a = new RequestMetric();
        a.setEndpoint("/a");
        a.setStatus(200);
        a.setCreatedAt(System.currentTimeMillis());

        RequestMetric b = new RequestMetric();
        b.setEndpoint("/b");
        b.setStatus(201);
        b.setCreatedAt(System.currentTimeMillis());

        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().directBinaryEncoder(outputStream, null);
        SpecificDatumWriter<RequestMetric> datumWriter = new SpecificDatumWriter<>(RequestMetric.class);

        datumWriter.write(a, encoder);
        datumWriter.write(b, encoder);
        encoder.flush();

        outputStream.close();
        byte[] bytes = outputStream.toByteArray();

        DatumReader<RequestMetric> datumReader = new SpecificDatumReader<>(RequestMetric.class);

        datumReader.setSchema(a.getSchema());


        try(ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes)) {
            Decoder decoder = DecoderFactory.get().directBinaryDecoder(inputStream, null);

            while(true){
                try {
                    RequestMetric record = datumReader.read(null, decoder);
                    System.out.println(record);
                } catch (EOFException eof) {
                    break;
                }
            }
        }
    }

}

That’s all for now, we specified an Avro schema, generated the model and read and wrote Avro records.

New Book Day: Modern API Development with Spring 6 and Spring Boot 3

The holiday season is close and it is time to resume and enhance my reading backlog. I wanted to get a break from infrastructure related topics and revisit some of my Spring REST API days.
I picked this book and it is a real treat. It focuses mainly on REST APIs but includes GRPC as well as GraphQL.
Obviously APIs are not anymore a hot topic but an essential topic. Unless you live under a rock, your daily work includes integrating with an API or involves something that integrates with an API.
You start with designing a REST API using the OpenAPI specification and then work towards the implementation of the REST API.
Then the book proceeds smoothly on consuming the API by implementing a front-end application, testing the API, containerising it, deploying it as well as monitoring it. This gives an end-to-end experience on implementing REST APIs with Spring.

As expected the essential topics such as security, JWT, HateOAS, database integrations, REST API best practices, testing and deploying to prod are covered. While going through the topics, I liked the opportunities the author gave to get back to the basics. There is a focus on designing REST APIs but for each technical aspect involved there is a deep dive like learning more on Spring, IoC containers, annotations, configuration modularisation, JPA as well as key components of Spring that help towards a REST API implementation.
Apart from that the book does not stick only to REST API implementation thus GraphQL and gRPC are also included.

For me a winner is the extras that come with the book
For example you get to use project Reactor and Webflux. Flyway for DB migrations is also included. Monitoring using the ELK stack is also included. Let alone that you get to integrate with Kubernetes using minikube.
Overall this book is complete and whether you are a seasoned professional or you get started with APIs in Spring, it’s a solid source of information.

In the next version it would be awesome to see topics such as rate limiting, mTLS, rest client implementation practises and a TDD approach on the examples.

Shoutout to Sourabh Sharma. Thank you for this book!

Spring Webflux Retries

If you use Spring Webflux you probably want your requests to be more resilient. In this case we can just use the retries that come packaged with the Webflux library.
There are various cases that we can take into account:

  • too many requests to the server
  • an internal server error
  • unexpected format
  • server timeout

We would make a test case for those using MockWebServer.

We shall add the WebFlux and the MockWebServer to a project:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <version>2.7.15</version>
        </dependency>

        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>mockwebserver</artifactId>
            <version>4.11.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
            <version>3.5.9</version>
        </dependency>

Let’s check the scenario of too many requests on the server. In this scenario our request fails because the server will not fulfil it. The server is still functional however and on another request, chances are we shall receive a proper response.

import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.SocketPolicy;
import org.junit.jupiter.api.Test;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

class WebFluxRetry {

    @Test
    void testTooManyRequests() throws IOException {
        MockWebServer server = new MockWebServer();
        MockResponse tooManyRequests = new MockResponse()
                .setBody("Too Many Requests")
                .setResponseCode(429);
        MockResponse successfulRequests = new MockResponse()
                .setBody("successful");

        server.enqueue(tooManyRequests);
        server.enqueue(tooManyRequests);
        server.enqueue(successfulRequests);
        server.start();

        WebClient webClient = WebClient.builder()
                .baseUrl("http://" + server.getHostName() + ":" + server.getPort())
                .build();

        Mono<String> result = webClient.get()
                .retrieve()
                .bodyToMono(String.class)
                .retry(2);

        StepVerifier.create(result)
                .expectNextMatches(s -> s.equals("successful"))
                .verifyComplete();

        server.shutdown();
    }
}

We used mock server in order to enqueue requests. Essentially the requests we placed on the mock server will be enqueued and consumed every time we do a request. The first two responses would be failed 429 responses from the server.

Let’s check the case of 5xx responses. A 5xx can be caused by various reasons. Usually if we face a 5xx probably there is a problem in the server codebase. However in some cases 5xx might come as a result of an unstable service that regularly restarts, also a server might be deployed in an availability zone that faces network issues, it can even be a failed rollout which is not fully in effect. In this case a retry makes sense. By retrying, the request will be routed to the next server behind the load balancer.
What we shall try a request that has a bad status:

    @Test
    void test5xxResponse() throws IOException {
        MockWebServer server = new MockWebServer();
        MockResponse tooManyRequests = new MockResponse()
                .setBody("Server Error")
                .setResponseCode(500);
        MockResponse successfulRequests = new MockResponse()
                .setBody("successful");

        server.enqueue(tooManyRequests);
        server.enqueue(tooManyRequests);
        server.enqueue(successfulRequests);
        server.start();

        WebClient webClient = WebClient.builder()
                .baseUrl("http://" + server.getHostName() + ":" + server.getPort())
                .build();

        Mono<String> result = webClient.get()
                .retrieve()
                .bodyToMono(String.class)
                .retry(2);

        StepVerifier.create(result)
                .expectNextMatches(s -> s.equals("successful"))
                .verifyComplete();

        server.shutdown();
    }

Also a response with a wrong format is possible to happen if an application goes haywire:

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    private static class UsernameResponse {
        private String username;
    }

    @Test
    void badFormat() throws IOException {
        MockWebServer server = new MockWebServer();
        MockResponse tooManyRequests = new MockResponse()
                .setBody("Plain text");
        MockResponse successfulRequests = new MockResponse()
                .setBody("{\"username\":\"test\"}")
                .setHeader("Content-Type","application/json");

        server.enqueue(tooManyRequests);
        server.enqueue(tooManyRequests);
        server.enqueue(successfulRequests);
        server.start();

        WebClient webClient = WebClient.builder()
                .baseUrl("http://" + server.getHostName() + ":" + server.getPort())
                .build();

        Mono<UsernameResponse> result = webClient.get()
                .retrieve()
                .bodyToMono(UsernameResponse.class)
                .retry(2);

        StepVerifier.create(result)
                .expectNextMatches(s -> s.getUsername().equals("test"))
                .verifyComplete();

        server.shutdown();
    }

If we break it down we created two responses with plain text format. Those responses would be rejected since they cannot be mapped to the UsernameResponse object. Thanks to the retries we managed to get a successful response.

Our last request would tackle the case of a timeout:

    @Test
    void badTimeout() throws IOException {
        MockWebServer server = new MockWebServer();
        MockResponse dealayedResponse= new MockResponse()
                .setBody("Plain text")
                .setSocketPolicy(SocketPolicy.DISCONNECT_DURING_RESPONSE_BODY)
                .setBodyDelay(10000, TimeUnit.MILLISECONDS);
        MockResponse successfulRequests = new MockResponse()
                .setBody("successful");

        server.enqueue(dealayedResponse);
        server.enqueue(successfulRequests);
        server.start();

        WebClient webClient = WebClient.builder()
                .baseUrl("http://" + server.getHostName() + ":" + server.getPort())
                .build();

        Mono<String> result = webClient.get()
                .retrieve()
                .bodyToMono(String.class)
                .timeout(Duration.ofMillis(5_000))
                .retry(1);

        StepVerifier.create(result)
                .expectNextMatches(s -> s.equals("successful"))
                .verifyComplete();

        server.shutdown();
    }

That’s it, thanks to retries our codebase was able to recover from failures and become more resilient. Also we used MockWebServer which can be very handy for simulating these scenarios.

Java Concurrency: ReadWriteLock

Imagine a scenario where we want to distinguish write threads and read threads.

  • As we execute a write we want isolation
  • As we execute a read we don’t want data to change
  • Read operations does not block other read operations

From a writers perspective we want to achieve isolation and consistency. From a readers perspective we want to be able to read results that are consistent, we want to take advantage of the java memory model and we don’t want to prevent other readers from reading too.
The ReadWriteLock can fulfil the above.

A ReadWriteLock maintains a pair of associated locks, one for read-only operations and one for writing. The read lock may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock is exclusive.
All ReadWriteLock implementations must guarantee that the memory synchronization effects of writeLock operations (as specified in the Lock interface) also hold with respect to the associated readLock. That is, a thread successfully acquiring the read lock will see all updates made upon previous release of the write lock.

Let’s examine the interface

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     *
     * @return the lock used for reading
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     *
     * @return the lock used for writing
     */
    Lock writeLock();
}

In both cases we receive back the lock interface which we saw previously.

We have two implementations of a ReadWriteLock the ReentrantReadWriteLock and ReadWriteLockView, we will examine them further.
For now we shall use the ReentrantReadWriteLock implementation.

Let’s see some usage scenarios using Threads and Runables.

We have the write and read lock acquisition scenarios:

    private int counter = 0;

    @Test
    void testWrite() {
        Lock lock = readWriteLock.writeLock();

        lock.lock();

        try {
            counter++;
        } finally {
            lock.unlock();;
        }
    }

    @Test
    void testRead() {
        Lock lock = readWriteLock.readLock();

        lock.lock();

        try {
            log.info("Counter is {}", counter);
        } finally {
            lock.unlock();;
        }
    }

The next scenario is read first and then try to perform a write:

    private int counter = 0;

    @Test
    void testReadThenWrite() throws InterruptedException {
        Thread readThread = new Thread(() -> {
            int readValue;
            Lock lock = readWriteLock.readLock();

            lock.lock();

            try {
                readValue = counter;
                log.info("Read successfully [{}]",readValue);
                sleep();
            } finally {
                lock.unlock();;
            }
        });

        Thread writeThread = new Thread(() -> {
            Lock lock = readWriteLock.writeLock();
            lock.lock();

            try {
                counter++;
                log.info("wrote successfully [{}]",counter);
            } finally {
                lock.unlock();;
            }

        });

        readThread.start();
        Thread.sleep(1000L);
        writeThread.start();
        writeThread.join();
        readThread.join();
    }

    private static void sleep() {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

In this case we can see that the write operation waited for the read operation to finish.

08:40:31.686 [Thread-0] INFO com.gkatzioura.concurrency.lock.readwrite.ReadWriteShowcase - Read successfully [0]
08:40:41.693 [Thread-1] INFO com.gkatzioura.concurrency.lock.readwrite.ReadWriteShowcase - wrote successfully [1]

The other scenario is write first and then try to perform a read:

    private int counter = 0;

    @Test
    void testWriteThenRead() throws InterruptedException {
        Thread readThread = new Thread(() -> {
            int readValue;
            Lock lock = readWriteLock.readLock();

            lock.lock();

            try {
                readValue = counter;
                log.info("Read successfully [{}]",readValue);
            } finally {
                lock.unlock();;
            }
        });

        Thread writeThread = new Thread(() -> {
            Lock lock = readWriteLock.writeLock();
            lock.lock();

            try {
                counter++;
                log.info("wrote successfully [{}]",counter);
                sleep();
            } finally {
                lock.unlock();;
            }

        });

        writeThread.start();
        Thread.sleep(1000L);
        readThread.start();

        writeThread.join();
        readThread.join();
    }

    private static void sleep() {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

As expected the read operation waited for the write operation to finish:

08:42:21.646 [Thread-1] INFO com.gkatzioura.concurrency.lock.readwrite.ReadWriteShowcase - wrote successfully [1]
08:42:31.652 [Thread-0] INFO com.gkatzioura.concurrency.lock.readwrite.ReadWriteShowcase - Read successfully [1]

The last scenario will perform multiple reads. ReadWriteLock has the feature where the read lock can be held by multiple readers.

    @Test
    void testMultipleReads() throws InterruptedException {
        Thread readThread1 = new Thread(() -> {
            int readValue;
            Lock lock = readWriteLock.readLock();

            lock.lock();

            try {
                readValue = counter;
                log.info("Read successfully [{}]",readValue);
                sleep();
            } finally {
                lock.unlock();;
            }
        });
        readThread1.setName("r-1");

        Thread readThread2 = new Thread(() -> {
            int readValue;
            Lock lock = readWriteLock.readLock();

            lock.lock();

            try {
                readValue = counter;
                log.info("Read successfully [{}]",readValue);
            } finally {
                lock.unlock();;
            }

        });
        readThread2.setName("r-2");

        readThread1.start();
        Thread.sleep(1000L);
        readThread2.start();

        readThread1.join();
        readThread2.join();
    }

In both case the response was immediate since there was no blocking at all.

08:46:05.018 [r-1] INFO com.gkatzioura.concurrency.lock.readwrite.ReadWriteShowcase - Read successfully [0]
08:46:06.022 [r-2] INFO com.gkatzioura.concurrency.lock.readwrite.ReadWriteShowcase - Read successfully [0]

From the above scenarios we can get an idea where ReentRantLock can be useful. The predominant scenario would be when we have primarily read operations and get advantage of multiple readers getting a hold on the Read lock. On another blog we will do some benchmarks and see its usage further.

Use JMH for your Java applications with Maven

Previously we had an example on using JMH with Gradle for our benchmarks.
Since maven is the most popular build tool for Java it is worth to setup an example.

Let’s add the dependency to our project

 

    <dependencies>
        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-core</artifactId>
            <version>1.36</version>
        </dependency>
        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-generator-annprocess</artifactId>
            <version>1.36</version>
        </dependency>
    </dependencies>

Let’s add a simple benchmark for Array initialization:

import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Mode.All)
public class ArrayInitializationBenchmark {

  @Benchmark
  public Integer[] initialize() {
    return new Integer[256];
  }

}

We can run the benchmark in various ways. A main class org.openjdk.jmh.Main is provided. This class can be used to pass arguments and execute the benchmarks of interest. So what we can do is to use the maven shade plugin and generate a binary just for the benchmarks.

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <finalName>jmh-benchmarks</finalName>
                            <transformers>
                                <transformer
                                  implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.openjdk.jmh.Main</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

By running

mvn clean install

We shall have two jars generated. The jar jmh-benchmarks.jar is the one of interest since it will come configured with the org.openjdk.jmh.Main class.

Running the benchmark is simple

java -jar target/jmh-benchmarks.jar ArrayInitializationBenchmark

It we need more customisations we can also create our own main method and define our benchmarks that we shall pass to the JMH Runner.

package com.gkatzioura.concurrency.benchmark;

import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

public class BenchmarkMain {

  public static void main(String[] args) throws RunnerException {
    Options opt = new OptionsBuilder().include(ArrayInitializationBenchmark.class.getSimpleName()).build();
    new Runner(opt).run();
  }

}

The results will be printed on console

Benchmark                                                     Mode      Cnt     Score   Error   Units
ArrayInitializationBenchmark.initialize                      thrpt       25    10.050 ± 1.045  ops/us
ArrayInitializationBenchmark.initialize                       avgt       25     0.116 ± 0.011   us/op
ArrayInitializationBenchmark.initialize                     sample  8122862     0.147 ± 0.003   us/op
ArrayInitializationBenchmark.initialize:initialize·p0.00    sample                ≈ 0           us/op
ArrayInitializationBenchmark.initialize:initialize·p0.50    sample              0.125           us/op
ArrayInitializationBenchmark.initialize:initialize·p0.90    sample              0.208           us/op
ArrayInitializationBenchmark.initialize:initialize·p0.95    sample              0.250           us/op
ArrayInitializationBenchmark.initialize:initialize·p0.99    sample              0.417           us/op
ArrayInitializationBenchmark.initialize:initialize·p0.999   sample              1.374           us/op
ArrayInitializationBenchmark.initialize:initialize·p0.9999  sample             16.608           us/op
ArrayInitializationBenchmark.initialize:initialize·p1.00    sample           2179.072           us/op
ArrayInitializationBenchmark.initialize                         ss        5     2.992 ± 0.593   us/op

We have all the benchmark modes enabled thus each one of them is listed: Throughput, Average, Single Shot and Sample.
For a more detailed view on the benchmark and the options we have you can check the previous blog on JMH.

That’s it! Happy benchmarking

Java Concurrency: Condition

Previously we checked on ReentRantLock and its fairness. One of the things we can stumble upon is the creation of a Condition. By using Condition we can create mechanisms that allow threads to wait for specific conditions to be met before proceeding with their execution.

 

The condition provides the following methods:


public interface Condition {

    void await() throws InterruptedException;

    void awaitUninterruptibly();

    long awaitNanos(long nanosTimeout) throws InterruptedException;

    boolean await(long time, TimeUnit unit) throws InterruptedException;

    boolean awaitUntil(Date deadline) throws InterruptedException;

    void signal();

    void signalAll();
}

The closest we came to that so far is the wait Object Monitor method.
A Condition is bound to a Lock and a thread cannot interact with a condition and its methods if it does not have a hold on that lock.
Also Condition uses the underlying lock mechanisms, for example signal and signalAll will use the underlying Queue of the threads that is maintained by the Lock and will notify them to wake up.

One of the obvious things to implement using Conditions is a BlockingQueue. Worker threads processing data and publisher threads dispatching data. Data are published on a queue, worker threads will process data from the queue and then they should wait if there is no data in the queue.

For a worker thread, if the condition is met the flow is the following:

  • Acquire the lock
  • Check the condition
  • Process Data
  • Release the lock

If the condition is not met, the flow would slightly change to this:

  • Acquire the lock
  • Check the condition
  • Wait until the condition is met.
  • Re-acquire the lock
  • Process Data
  • Release the lock

The publisher thread whenever it adds a message it should notify the threads waiting on the condition.

The workflow would be like this.

  • Acquire the lock
  • Publish data
  • Notify the workers
  • Release the lock

 

Obviously this functionality already exists through the BlockingQueue interface and the LinkedBlockingDeque and ArrayBlockingQueue implementation.
We will proceed with an implementation for the shake of the example.

Let’s see the Message Queue:

package com.gkatzioura.concurrency.lock.condition;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MessageQueue<T> {

    private Queue<T> queue = new LinkedList<>();
    private Lock lock = new ReentrantLock();
    private Condition hasMessages = lock.newCondition();

    public void publish(T message) {
        lock.lock();
        try {
            queue.offer(message);
            hasMessages.signal(); 
        } finally {
            lock.unlock();
        }
    }

    public T receive() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                hasMessages.await();
            }
            return queue.poll();
        } finally {
            lock.unlock();
        }
    }

}

Now let’s put it into action:

        MessageQueue<String> messageQueue = new MessageQueue<>();

    @Test
    void testPublish() throws InterruptedException {
        Thread publisher = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                String message = "Sending message num: " + i;
                log.info("Sending [{}]", message);
                messageQueue.publish(message);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        Thread worker1 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    String message = messageQueue.receive();
                    log.info("Received: [{}]", message);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        Thread worker2 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    String message = messageQueue.receive();
                    log.info("Received: [{}]", message);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        publisher.start();
        worker1.start();
        worker2.start();

        publisher.join();
        worker1.join();
        worker2.join();
    }

That’s it! Our workers processed the expected messages and waited when the queue was empty.

Java Concurrency: ReentRantLock Fairness

Previously we saw some of the building blocks of concurrency in Java. In this blog we will focus on ReentRantLock. ReentRantLock is based on the AbstractQueuedSynchronizer.

By using the ReentRantLock we allow a thread to acquire a lock and use it multiple times. As the name suggests is provides us with Reentrancy just like the synchronized blocks.
Because ReentRantLock is implemented using the AbstractQueuedSynchronizer we can have fairness. The threads that try to acquire the lock will wait in a FIFO fashion and the algorithm will try to ensure fairness.

If we check our previous blog on locks we can see that locks are featureful.

If a thread holds the lock we cannot unlock it, only that thread is able to unlock it:

    @Test
    void alreadyAcquired() throws Exception {
        final ReentrantLock reentrantLock = new ReentrantLock();

        Thread thread = new Thread(() -> {
            log.info("Try first time");
            reentrantLock.lock();
            log.info("Got the lock");
        });

        thread.start();
        thread.join();
        assertThrows(IllegalMonitorStateException.class, reentrantLock::unlock);
    }

Also as the name suggests let’s see the reentrancy. In the following example the same thread will try to get the lock it holds multiple times.

@Test
void reentrancy() throws Exception {
final ReentrantLock reentrantLock = new ReentrantLock();

Thread thread = new Thread(() -&gt; {
log.info("Try first time");
reentrantLock.lock();
log.info("Got the lock");

log.info("lock one more time");
reentrantLock.lock();
log.info("locked again");
reentrantLock.unlock();
});

thread.start();
thread.join();
}

In the next example we will construct a ReentRantLock with fairness enabled. Let’s do a small test on ReentRantLock’s of fairness.
5 threads will try to acquire the lock simultaneously. Then we will see how they achieved so and their order on acquiring the lock.

    @AllArgsConstructor
    private class CheckFairnessRunnable implements Runnable {

        private final int sleepTime;
        private final Lock lock;

        @Override
        public void run() {
            try {
                Thread.sleep(sleepTime);
                log.info("acquiring lock");
                lock.lock();
                log.info("lock acquired");
                Thread.sleep(5000);
                lock.unlock();
                log.info("lock unlocked");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Test
    void fairness() throws Exception {
        int numberOfThreads = 5;
        ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);

        final ReentrantLock reentrantLock = new ReentrantLock(true);

        for (int i = 0; i < numberOfThreads; i++) {
            Runnable runnable = new CheckFairnessRunnable(i*100, reentrantLock);
            executor.execute(runnable);
        }

        executor.shutdown();
        executor.awaitTermination(60, TimeUnit.SECONDS);
    }

If we examine the code and the CheckFairnessRunnable the thread initially sleeps. This is done on purpose so threads don’t arrive at the lock at the same time. The time a thread spends when it acquires the lock is high on purpose so we make sure all threads will wait for the lock.

Let’s check the output:

09:07:43.782 [pool-1-thread-1] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - acquiring lock
09:07:43.788 [pool-1-thread-1] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock acquired
09:07:43.882 [pool-1-thread-2] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - acquiring lock
09:07:43.985 [pool-1-thread-3] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - acquiring lock
09:07:44.085 [pool-1-thread-4] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - acquiring lock
09:07:44.185 [pool-1-thread-5] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - acquiring lock
09:07:48.797 [pool-1-thread-2] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock acquired
09:07:48.796 [pool-1-thread-1] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock unlocked
09:07:53.802 [pool-1-thread-3] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock acquired
09:07:53.802 [pool-1-thread-2] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock unlocked
09:07:58.807 [pool-1-thread-4] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock acquired
09:07:58.806 [pool-1-thread-3] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock unlocked
09:08:03.813 [pool-1-thread-5] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock acquired
09:08:03.813 [pool-1-thread-4] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock unlocked
09:08:08.819 [pool-1-thread-5] INFO com.gkatzioura.concurrency.lock.ReentrantLockTest - lock unlocked

We can see that the threads that were first to try to acquire the lock were also the ones that got the lock, so essentially the order was preserved.

That’s it on the next blog we will check the Condition interface.

Java Concurrency: The AbstractQueuedSynchronizer

Previously we worked with one of the building blocks of Java concurrency the LockSupport class. On this blog we will have a look at the AbstractQueuedSynchronizer.
AbstractQueuedSynchronizer is one of the building blocks of Java synchronization. Lock implementations such as ReentrantLock and Semaphore are based on the AbstractQueuedSynchronizer.
An integer represents the state. Based on the use case what a state represent can vary for example it can represent how many permits are available. Operations that change the state are guarded by a FIFO queue and tries to do so in a fair manner, and provided access to the lock in the order which the thread arrived.

 

The FIFO queue is implemented using a nested Node class is. The Node represents the Thread waiting in the queue. The operations are atomic and the variables are volatile. It contains references to the previous and the next node.

    abstract static class Node {
        volatile Node prev;       // initially attached via casTail
        volatile Node next;       // visibly nonnull when signallable
        Thread waiter;            // visibly nonnull when enqueued
        volatile int status;      // written by owner, atomic bit ops by others

...
    }

An instance of the AbstractQueuedSynchronizer, since it provides a FIFO, will have a reference of the head and tail in the class:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
...
    private transient volatile Node head;
    private transient volatile Node tail;
...
}

As mentioned we have the state variable¨

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
...
    private volatile int state;
...
}

Τhe state variable can be used to represent the resources available or allocated.

When it comes to ReentrantLock it represent the number of holds on the lock:

public class ReentrantLock implements Lock, java.io.Serializable {
...
    static final class FairSync extends Sync {
...
        final boolean initialTryLock() {
...
            } else if (getExclusiveOwnerThread() == current) {
                if (++c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            }
...
    }
...
}

When it comes to the Semaphore it represents the permits:

public class Semaphore implements java.io.Serializable {
...
    static final class FairSync extends Sync {
...
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
...
   }
...
}

Since the AbstractQueuedSynchronizer supports a shared mode as well as an exclusive mode, there are two Node implementations to represent it.

    static final class ExclusiveNode extends Node { }
    static final class SharedNode extends Node { }

For example ReentrantLock is on an Exclusive mode while Semaphore is on a shared mode.

The acquire method is where the logic lies.

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

The implementation of acquiring the lock can vary from jdk implementations but the objectives should be the same.
The first step is the thread to try and acquire the lock if it is available without blocking.
This can vary on the Shared on Exclusive mode.
If the lock is held the Thread is blocked and aded to the waiting queue.

    final boolean acquireQueued(final Node node, int arg) {
        boolean interrupted = false;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node))
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
...
        }
    }

As the lock gets released by the thread holding it the threads in the queue are signaled to proceed and acquire the lock.
The mechanism varies whether shared or exclusive.

Now that we know more on the AbstractQueuedSynchronizer we can check its implementations and find more about its usage.