Overview

  • In this primer, we will explore the concept of concurrency in computer science, providing examples in both Java and Python. Concurrency is a crucial topic, particularly in the context of machine learning (ML) infrastructure, where efficient resource management and parallel processing are key to handling large-scale data and computational tasks.
  • Below, we see the overall lifecycle of a thread (source)

Understanding Multithreading and Multitasking

  • Multithreading: This technique involves running multiple threads within a single application, allowing different tasks to be executed simultaneously. In a machine learning context, multithreading can be used to parallelize operations such as data preprocessing, model training, and real-time data analysis. For example, while one thread handles data loading, another can preprocess the data, and yet another can perform model evaluation. This parallelism not only speeds up the computation but also optimizes CPU usage, especially during I/O-bound tasks like reading from disk or network operations.

Multithreading Example

  • Source: YouTube Video

  • Multitasking: This involves running multiple applications concurrently, each potentially comprising several threads. In ML infrastructure, multitasking enables the simultaneous execution of various services, such as data ingestion, model serving, and monitoring. This concurrent execution is vital in production environments where real-time inference and continuous integration/deployment (CI/CD) processes must run smoothly and efficiently.

  • Concurrency in ML Infrastructure: Concurrency allows ML infrastructure to efficiently handle multiple simultaneous tasks, such as data ingestion, preprocessing, model training, and serving. AWS components like EC2, Lambda, and SageMaker manage these tasks concurrently, allowing the infrastructure to scale based on demand.

  • In the following sections, we will delve deeper into these concepts, exploring their implications and applications in modern computing, particularly in the context of machine learning systems.

Threads or Platform Thread

  • Create a New Thread and Start It:
    • To create a new thread in Java, you can define a class that extends Thread and override the run method. This method contains the code that should run in the new thread. The thread is started using the start method, which in turn calls the run method.
    class MyThread extends Thread {
        public void run() {
            System.out.println("Thread is running.");
        }
    }
    
    public class Main {
        public static void main(String[] args) {
            MyThread t1 = new MyThread();
            t1.start();
        }
    }
    

    Pros:

    • Simple and straightforward to use.
    • Direct access to the Thread class methods.

    Cons:

    • Extending Thread means the class cannot extend any other class, which limits flexibility.
    • Not recommended for large projects where multiple inheritance might be required.
  • Java Lambda Expression Code:
    • In Java 8 and later, you can use lambda expressions with the Runnable interface to define the task for a thread. This approach is more concise and readable, especially for simple tasks.
    Runnable r = () -> {
        System.out.println("Thread using lambda expression is running.");
    };
    new Thread(r).start();
    

    Pros:

    • More concise and easier to read, especially for short tasks.
    • Does not require creating a separate class.

    Cons:

    • Less control over thread-specific methods and behaviors.
    • Not ideal for complex threading logic where more control is needed.
  • Java Subclass of Thread:
    • Similar to the first method, but here we emphasize creating a subclass specifically for threading logic.
    class MyThreadSubclass extends Thread {
        public void run() {
            System.out.println("Thread subclass is running.");
        }
    }
    
    public class Main {
        public static void main(String[] args) {
            MyThreadSubclass t1 = new MyThreadSubclass();
            t1.start();
        }
    }
    

    Pros:

    • Encapsulates thread logic within a specialized subclass.
    • Useful for creating complex thread-specific behaviors.

    Cons:

    • Same limitations as extending the Thread class directly, such as single inheritance.
  • Java Anonymous Class:
    • This method involves using an anonymous inner class to define the thread’s behavior, providing a quick way to create a thread without needing a named class.
    Thread t = new Thread() {
        public void run() {
            System.out.println("Thread using anonymous class is running.");
        }
    };
    t.start();
    

    Pros:

    • Useful for quick and temporary threads.
    • No need for a separate class file.

    Cons:

    • Can become messy and hard to read with more complex logic.
    • Limited reusability and difficult to debug.
  • Java Runnable Interface Implementation:
    • Implementing the Runnable interface is the most flexible and recommended approach, especially in large projects. The Runnable interface defines a single method, run, which is implemented to contain the code to be executed in the thread.
    class MyRunnable implements Runnable {
        public void run() {
            System.out.println("Runnable thread is running.");
        }
    }
    
    public class Main {
        public static void main(String[] args) {
            MyRunnable r = new MyRunnable();
            Thread t = new Thread(r);
            t.start();
        }
    }
    

    Pros:

    • Does not restrict inheritance, allowing the class to extend other classes if needed.
    • Decouples task logic from thread management, making it more reusable and easier to manage.

    Cons:

    • Slightly more verbose than using lambda expressions for simple tasks.
  • The choice of method depends on the specific needs of your application:

  • For simple and quick implementations, lambda expressions or anonymous classes are convenient.
  • For more complex threading logic or where thread-specific behaviors are needed, subclassing Thread might be appropriate.
  • The Runnable interface is generally recommended for its flexibility and separation of concerns, especially in larger projects or when needing to extend other classes.

  • Overall, the Runnable interface approach is often considered the best practice due to its flexibility and maintainability.

Start Multiple Threads

  • Java Code for 2 Threads with Runnable:
    • Using the Runnable interface, you can create multiple threads by defining tasks within the run method.
    class Task implements Runnable {
        private String name;
        public Task(String name) {
            this.name = name;
        }
        public void run() {
            for(int i = 0; i < 5; i++) {
                System.out.println(name + " is running, iteration: " + i);
            }
        }
    }
    
    public class Main {
        public static void main(String[] args) {
            Thread t1 = new Thread(new Task("Thread 1"));
            Thread t2 = new Thread(new Task("Thread 2"));
    
            t1.start();
            t2.start();
        }
    }
    
  • Thread.sleep:
    • The Thread.sleep method pauses the execution of the current thread for a specified time in milliseconds.
    try {
        Thread.sleep(1000); // Sleep for 1 second
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    

    Pros:

    • Simple way to delay thread execution.

    Cons:

    • May not be precise due to system scheduler limitations.
    • Can lead to inefficient resource use if overused.
  • Manage Thread Stop Yourself for Runnable:
    • To stop a thread gracefully, you can use a flag that the Runnable checks periodically.
    class StoppableTask implements Runnable {
        private volatile boolean running = true;
    
        public void run() {
            while (running) {
                System.out.println("Task is running...");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            System.out.println("Task stopped.");
        }
    
        public void stop() {
            running = false;
        }
    }
    
    public class Main {
        public static void main(String[] args) throws InterruptedException {
            StoppableTask task = new StoppableTask();
            Thread t = new Thread(task);
            t.start();
    
            Thread.sleep(2000); // Let the task run for a bit
            task.stop(); // Request the task to stop
            t.join(); // Wait for the task to finish
        }
    }
    
  • Explain Java Daemon Thread, Show Code:
    • A daemon thread runs in the background and does not prevent the JVM from exiting once all user threads have finished.
    public class DaemonThreadExample {
        public static void main(String[] args) {
            Thread daemonThread = new Thread(() -> {
                while (true) {
                    System.out.println("Daemon thread is running...");
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            daemonThread.setDaemon(true);
            daemonThread.start();
    
            System.out.println("Main thread is finished.");
        }
    }
    

    Pros:

    • Useful for background tasks (e.g., garbage collection, monitoring).

    Cons:

    • May not finish execution if JVM exits before the thread completes.
  • Thread.join Explain:
    • The join method allows one thread to wait for the completion of another.
    Thread t = new Thread(() -> {
        System.out.println("Thread started.");
    });
    
    t.start();
    try {
        t.join(); // Wait for thread t to finish
        System.out.println("Thread finished.");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    

    Pros:

    • Ensures a thread completes before proceeding, useful in synchronization.

    Cons:

    • Can cause delays if the joined thread takes a long time to finish.
  • Java Has OS Level Threads, Python Has…:
    • Java:
      • Java threads are typically mapped to native OS threads, managed by the JVM.
      • Pros: Efficient CPU utilization, leveraging multi-core processors.
      • Cons: Requires significant system resources; limited by the number of OS threads.
    • Python:
      • Python uses the Global Interpreter Lock (GIL) in CPython, which allows only one thread to execute Python bytecode at a time.
      • Pros: Simplifies memory management, safe concurrent access.
      • Cons: Limits true parallel execution of threads, better suited for I/O-bound rather than CPU-bound tasks.
  • These Require Stack Space:
    • Both Java and Python threads require stack space for each thread’s execution context.
    • Pros: Allows separate execution contexts.
    • Cons: Consumes memory; too many threads can lead to resource exhaustion.
  • Java Project LOOM:
    • Aims to introduce lightweight, user-mode threads (fibers) to the JVM, making concurrent programming easier and more efficient.
    • Pros: Reduced resource overhead, potentially thousands of fibers per OS thread.
    • Cons: Still in development, may require changes in how concurrency is approached.
  • JVM:
    • The Java Virtual Machine (JVM) manages Java threads, providing platform independence and handling thread scheduling, memory management, and more.
    • Pros: Cross-platform capabilities, robust thread management.
    • Cons: Potential overhead compared to native code execution, requires tuning for optimal performance.
  • This section outlines various threading concepts and their implementation in Java, including managing multiple threads, understanding daemon threads, and comparing Java and Python’s threading models.

Virtual Threads

Thread Types

  • Virtual threads, as depicted above (source), are a concept in concurrent programming, particularly relevant in modern programming languages and runtime environments.
  • Virtual threads are a lighter-weight abstraction, often managed at the language runtime level rather than by the operating system. This can also be referred to as user-mode threads or green threads in some contexts.
  • They are designed to be much less resource-intensive compared to traditional threads, allowing the creation of a much larger number of them.
  • Virtual threads can be scheduled and managed by the runtime environment, which can be more efficient than relying solely on the OS for context switching and scheduling.
  • In this example, Executors.newVirtualThreadPerTaskExecutor() creates an executor that uses virtual threads. Each task is executed in its own virtual thread, and the virtual threads are much lighter than traditional OS threads.
import java.util.concurrent.Executors;

public class VirtualThreadsExample {
    public static void main(String[] args) {
        var executor = Executors.newVirtualThreadPerTaskExecutor();

        for (int i = 0; i < 10; i++) {
            executor.submit(() -> {
                System.out.println("Running in virtual thread: " + Thread.currentThread());
                try {
                    Thread.sleep(1000); // Simulate work
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        executor.close(); // Properly shutdown the executor
    }
}
  • Platform Threads:
    • Use When: CPU-bound tasks, low to moderate concurrency, requiring OS-level interactions, real-time requirements.
    • Characteristics: Heavyweight, blocking operations block OS threads, managed by OS, higher context switching cost.
  • Virtual Threads:
    • Use When: High concurrency, I/O-bound applications, simplified concurrency management, resource efficiency.
    • Characteristics: Lightweight, blocking operations do not block OS threads, managed by Java runtime, lower context switching cost.

Java (Using Project Loom for Virtual Threads)

Project Loom introduces virtual threads to the Java ecosystem, allowing lightweight concurrency. Below is a simple example:

import java.util.concurrent.Executors;

public class VirtualThreadsExample {
    public static void main(String[] args) {
        var executor = Executors.newVirtualThreadPerTaskExecutor();

        for (int i = 0; i < 10; i++) {
            executor.submit(() -> {
                System.out.println("Running in virtual thread: " + Thread.currentThread());
                try {
                    Thread.sleep(1000); // Simulate work
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        executor.close(); // Properly shutdown the executor
    }
}
  • In this example, Executors.newVirtualThreadPerTaskExecutor() creates an executor that uses virtual threads. Each task is executed in its own virtual thread, and the virtual threads are much lighter than traditional OS threads.

Python (Using asyncio)

Python does not have native virtual threads like Java’s Project Loom, but asyncio provides a way to write concurrent code using coroutines, which can be thought of as lightweight “threads” managed by the Python runtime.

import asyncio

async def task():
    print(f"Running in task: {asyncio.current_task().get_name()}")
    await asyncio.sleep(1)  # Simulate work

async def main():
    tasks = [task() for _ in range(10)]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())
  • In this Python example, asyncio is used to run multiple coroutines concurrently. The asyncio.gather() function runs all the coroutines concurrently, and asyncio.run() is used to start the event loop.

Key Differences and Similarities:

  • Java Virtual Threads: With Project Loom, Java introduces virtual threads that are lightweight and managed by the Java runtime. They allow for a synchronous programming style while still achieving high concurrency.
  • Python asyncio: Python’s asyncio provides an asynchronous programming model using coroutines. While not exactly the same as Java’s virtual threads, they achieve a similar effect by allowing the Python runtime to manage the execution of concurrent tasks without the need for traditional threading.

OS Thread vs Platform Thread vs Virtual Thread

  • The image below (source) displays the three tiers of threads in Java: OS Threads, Platform Threads, and Virtual Threads. Thread Types

OS Threads

  • Definition: OS threads are managed directly by the operating system. Each Java thread corresponds to an OS thread, which handles scheduling and execution.
  • Pros:
    • Leverage multi-core processors effectively.
    • Full support from the operating system for features like scheduling and prioritization.
  • Cons:
    • High memory overhead due to stack space and context switching.
    • Limited by the number of threads the OS can efficiently manage, leading to potential scalability issues.

Platform Threads

  • Definition: Platform threads, also known as native threads, are the traditional Java threads managed by the JVM. They are essentially wrappers around OS threads, providing platform independence.
  • Pros:
    • Cross-platform compatibility and ease of use within the Java ecosystem.
    • Utilizes the OS for low-level thread management, benefiting from native optimizations.
  • Cons:
    • Similar to OS threads, they are resource-intensive.
    • Can suffer from context-switching overhead, which impacts performance, especially with a high number of threads.

Virtual Threads

  • Definition: Virtual threads are a newer concept introduced with Project Loom in Java. They are lightweight, user-mode threads managed by the JVM rather than the OS.
  • Pros:
    • Low overhead, allowing for a large number of threads (potentially millions) without significant performance degradation.
    • Easier and more efficient handling of concurrency, particularly in applications with high I/O-bound workloads.
  • Cons:
    • Still in development and not yet widely adopted or available in all Java environments.
    • May require rethinking traditional concurrency patterns and practices.

Summary:

  • OS and Platform Threads are similar in terms of functionality and overhead, with the primary distinction being that OS threads are directly managed by the operating system, while platform threads are managed by the JVM.
  • Virtual Threads offer a promising future for Java concurrency, with potential to vastly improve scalability and performance by significantly reducing the resource footprint of each thread.

  • This breakdown provides an understanding of the different types of threads available in Java and their respective advantages and disadvantages.

Memory

How Java Threads Access Memory

  • Memory Access Mechanism:
    • Java threads access memory through the Java Virtual Machine (JVM), which manages memory allocation and garbage collection. Each thread has its own stack for storing local variables, method call information, and control flow. Shared objects are stored in the heap, which all threads can access.

Java Code

  • Code to Illustrate Memory Access:

    // Class representing a shared resource accessed by multiple threads
    class SharedResource {
        private int counter = 0;
    
        // Synchronized method to safely increment the counter
        public synchronized void increment() {
            counter++;
        }
    
        // Method to retrieve the current value of the counter
        public int getCounter() {
            return counter;
        }
    }
    
    public class MemoryExample {
        public static void main(String[] args) {
            // SharedResource object that will be accessed by multiple threads
            SharedResource resource = new SharedResource();
    
            // First thread that increments the counter
            Thread t1 = new Thread(() -> {
                for (int i = 0; i < 1000; i++) {
                    resource.increment(); // Safely increment counter
                }
            });
    
            // Second thread that also increments the counter
            Thread t2 = new Thread(() -> {
                for (int i = 0; i < 1000; i++) {
                    resource.increment(); // Safely increment counter
                }
            });
    
            // Start both threads
            t1.start();
            t2.start();
    
            try {
                // Wait for both threads to complete
                t1.join();
                t2.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            // Print the final value of the counter
            System.out.println("Final counter value: " + resource.getCounter());
        }
    }
    
    • In this example, the SharedResource class contains a counter variable. The increment method is synchronized to ensure thread-safe access when multiple threads modify the counter simultaneously.

Thread and Thread Stack and Heap

  • Thread:
    • Each Java thread has its own stack, which is separate from other threads. This stack stores local variables, method call frames, and control information.
  • Thread Stack:
    • The thread stack is specific to each thread, ensuring that local variables and method calls are isolated between threads, maintaining thread safety for these variables.
    • The stack holds primitive data types and references to objects in the heap.
  • Heap:
    • The heap is a shared memory area where objects are allocated. All threads in a Java application can access the heap, making it possible for threads to share objects and communicate.
    • The JVM’s garbage collector manages heap memory, reclaiming space used by objects that are no longer in use.

Now How Does Memory Look on Hardware Architecture:

  • CPU:
    • Thread and CPU Registers:
      • CPU registers are the fastest memory components and store immediate values for computations. Each thread has its own set of registers while running on a CPU core.
  • L1, L2, L3 Cache:
    • L1 Cache:
      • The smallest and fastest cache, located closest to the CPU cores, storing frequently accessed data and instructions.
    • L2 Cache:
      • Larger than L1 but slower, serving as a buffer between the L1 cache and the main memory.
    • L3 Cache:
      • The largest and slowest cache, shared among multiple cores, providing a last-level cache before accessing main memory.
  • RAM:
    • Thread Stack:
      • Part of RAM where each thread’s stack is stored, containing local variables and method call frames.
    • Heap:
      • The main area in RAM where all Java objects are allocated. The heap is shared among all threads, allowing for object sharing and communication between threads. The JVM manages the heap, handling object allocation and garbage collection.
  • This overview covers how Java threads interact with memory, including the roles of the thread stack, heap, and various hardware memory components.

Synchronized Exchanger Class

  • Concept: A Synchronized Exchanger class is used to safely exchange data between two threads, ensuring that both threads synchronize their operations when exchanging data.
class SynchronizedExchanger {
    private Object sharedData; // Shared data to be exchanged
    private boolean isEmpty = true; // Flag to check if data is available

    // Method to set data in the exchanger, synchronized to ensure thread safety
    public synchronized void set(Object data) {
        while (!isEmpty) {
            try {
                wait(); // Wait until the data is consumed
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // Restore interrupt status
            }
        }
        isEmpty = false;
        this.sharedData = data;
        notifyAll(); // Notify waiting threads that data is set
    }

    // Method to get data from the exchanger, synchronized to ensure thread safety
    public synchronized Object get() {
        while (isEmpty) {
            try {
                wait(); // Wait until the data is available
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // Restore interrupt status
            }
        }
        isEmpty = true;
        Object data = sharedData;
        notifyAll(); // Notify waiting threads that data has been consumed
        return data;
    }
}

public class ExchangerExample {
    public static void main(String[] args) {
        SynchronizedExchanger exchanger = new SynchronizedExchanger();

        // Producer thread sets data in the exchanger
        new Thread(() -> {
            exchanger.set("Data from producer");
            System.out.println("Producer set data.");
        }).start();

        // Consumer thread gets data from the exchanger
        new Thread(() -> {
            Object data = exchanger.get();
            System.out.println("Consumer received: " + data);
        }).start();
    }
}

Java Volatile

  • Concept: The volatile keyword ensures that updates to a variable are immediately visible to all threads, preventing caching of the variable’s value.
  • In Java, the volatile keyword is used to mark a variable as being stored in main memory. More precisely, it ensures that reads and writes to that variable are directly made to and from the main memory, rather than being cached locally in the CPU cache of a thread. This guarantees visibility of changes to variables across threads.
public class VolatileExample {
    private volatile boolean running = true; // Volatile variable to ensure visibility

    // Method to stop the thread safely
    public void stopRunning() {
        running = false; // Update running variable, immediately visible to other threads
    }

    // Method representing the thread's task
    public void run() {
        while (running) {
            System.out.println("Thread is running");
        }
        System.out.println("Thread stopped");
    }

    public static void main(String[] args) throws InterruptedException {
        VolatileExample example = new VolatileExample();
        Thread t = new Thread(example::run);
        t.start();
        Thread.sleep(1000); // Let the thread run for a bit
        example.stopRunning(); // Stop the thread
        t.join(); // Wait for the thread to finish
    }
}

Java ThreadLocal

  • Concept: ThreadLocal provides thread-local variables, ensuring that each thread accessing such a variable has its own independent copy.
public class ThreadLocalExample {
    // ThreadLocal variable with an initial value
    private static final ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 1);

    public static void main(String[] args) {
        // First thread sets and gets the ThreadLocal variable
        Thread t1 = new Thread(() -> {
            threadLocal.set(100); // Set thread-specific value
            System.out.println("Thread 1: " + threadLocal.get()); // Get thread-specific value
        });

        // Second thread sets and gets the ThreadLocal variable
        Thread t2 = new Thread(() -> {
            threadLocal.set(200); // Set thread-specific value
            System.out.println("Thread 2: " + threadLocal.get()); // Get thread-specific value
        });

        t1.start();
        t2.start();
    }
}

Race Conditions in Multithreading

  • Concept: Race conditions occur when two or more threads access shared data concurrently and attempt to modify it. This can lead to inconsistent or unexpected results.
class Counter {
    private int count = 0; // Shared variable

    // Method to increment the count, not synchronized (potential race condition)
    public void increment() {
        count++;
    }

    public int getCount() {
        return count;
    }
}

public class RaceConditionExample {
    public static void main(String[] args) throws InterruptedException {
        Counter counter = new Counter();

        // Thread 1 incrementing the counter
        Thread t1 = new Thread(counter::increment);

        // Thread 2 incrementing the counter
        Thread t2 = new Thread(counter::increment);

        t1.start();
        t2.start();
        t1.join();
        t2.join();

        // Output may not be as expected due to race condition
        System.out.println("Final count: " + counter.getCount());
    }
}

Concurrency vs Parallelism

  • Concept:
    • Concurrency: Making progress on more than one task by interleaving execution, not necessarily at the same time.
    • Parallelism: Executing multiple tasks simultaneously using multiple processors or cores.
public class ConcurrencyParallelismExample {
    public static void main(String[] args) {
        // Task 1 for demonstrating concurrency
        Runnable task1 = () -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("Task 1 - Count: " + i);
                try {
                    Thread.sleep(500); // Simulate time-consuming work
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        // Task 2 for demonstrating concurrency
        Runnable task2 = () -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("Task 2 - Count: " + i);
                try {
                    Thread.sleep(500); // Simulate time-consuming work
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        // Running tasks in parallel using separate threads
        Thread thread1 = new Thread(task1);
        Thread thread2 = new Thread(task2);

        thread1.start();
        thread2.start();
    }
}

Critical Section

  • Concept:
    • Atomic: Operations that appear indivisible; no other operations can see an intermediate state.
    • Synchronized Block: A block of code that is synchronized on a given object, ensuring that only one thread can execute it at a time.
public class CriticalSectionExample {
    private int count = 0;

    // Synchronized method to increment the count
    public synchronized void increment() {
        count++; // Only one thread can execute this at a time
    }

    public int getCount() {
        return count;
    }

    public static void main(String[] args) throws InterruptedException {
        CriticalSectionExample example = new CriticalSectionExample();
        Thread t1 = new Thread(example::increment);
        Thread t2 = new Thread(example::increment);

        t1.start();
        t2.start();
        t1.join();
        t2.join();

        // Print final count (expected to be 2 if no race condition occurs)
        System.out.println("Final count: " + example.getCount());
    }
}

Deadlock

  • Concept: A situation where two or more threads are blocked forever, each waiting on the other to release a resource.
public class DeadlockExample {
    private final Object lock1 = new Object(); // Lock 1
    private final Object lock2 = new Object(); // Lock 2

    // Method that acquires lock1 and then lock2
    public void method1() {
        synchronized (lock1) {
            System.out.println("Method1 acquired lock1");
            synchronized (lock2) {
                System.out.println("Method1 acquired lock2");
            }
        }
    }

    // Method that acquires lock2 and then lock1
    public void method2() {
        synchronized (lock2) {
            System.out.println("Method2 acquired lock2");
            synchronized (lock1) {
                System.out.println("Method2 acquired lock1");
            }
        }
    }

    public static void main(String[] args) {
        DeadlockExample example = new DeadlockExample();

        // Thread 1 attempts to lock lock1 then lock2
        new Thread(example::method1).start();

        // Thread 2 attempts to lock lock2 then lock1
        new Thread(example::method2).start();
    }
}

Threadpool

  • Concept: A pool of pre-instantiated reusable threads. A thread pool is used to manage a set of worker threads and to manage the execution of tasks.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolExample {
    public static void main(String[] args) {
        // Create a thread pool with 3 threads
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // Define tasks to be executed by the thread pool
        Runnable task1 = () -> System.out.println("Task 1 executed");
        Runnable task2 = () -> System.out.println("Task 2 executed");
        Runnable task3 = () -> System.out.println("Task 3 executed");

        // Submit tasks to the thread pool
        executor.execute(task1);
        executor.execute(task2);
        executor.execute(task3);

        // Shutdown the executor service gracefully
        executor.shutdown();
    }
}

Java Lock

  • Concept: Provides more sophisticated thread synchronization than `

synchronized` blocks. It offers features like tryLock() and lockInterruptibly().

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockExample {
    private final Lock lock = new ReentrantLock(); // Lock instance
    private int count = 0;

    // Method to increment the count with explicit locking
    public void increment() {
        lock.lock(); // Acquire the lock
        try {
            count++; // Critical section
        } finally {
            lock.unlock(); // Ensure the lock is released
        }
    }

    public int getCount() {
        return count;
    }

    public static void main(String[] args) throws InterruptedException {
        LockExample example = new LockExample();
        Thread t1 = new Thread(example::increment);
        Thread t2 = new Thread(example::increment);

        t1.start();
        t2.start();
        t1.join();
        t2.join();

        // Print final count
        System.out.println("Final count: " + example.getCount());
    }
}

Executor Service

  • Concept: A framework for managing a pool of threads, handling the scheduling and execution of asynchronous tasks.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorServiceExample {
    public static void main(String[] args) {
        // Create an ExecutorService with a fixed thread pool of 2 threads
        ExecutorService executor = Executors.newFixedThreadPool(2);

        // Define tasks to be executed
        Runnable task1 = () -> System.out.println("Task 1 executed");
        Runnable task2 = () -> System.out.println("Task 2 executed");

        // Submit tasks to the executor
        executor.submit(task1);
        executor.submit(task2);

        // Shutdown the executor service
        executor.shutdown();
    }
}

Blocking Queue

  • Concept: A thread-safe queue that supports operations that wait for the queue to become non-empty when retrieving and wait for space to become available in the queue when storing.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueExample {
    public static void main(String[] args) {
        // Create a blocking queue with a capacity of 10
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

        // Producer thread that adds elements to the queue
        Thread producer = new Thread(() -> {
            try {
                queue.put("Element"); // Add an element to the queue
                System.out.println("Element added");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // Handle interrupt
            }
        });

        // Consumer thread that takes elements from the queue
        Thread consumer = new Thread(() -> {
            try {
                String element = queue.take(); // Take an element from the queue
                System.out.println("Element removed: " + element);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // Handle interrupt
            }
        });

        producer.start();
        consumer.start();
    }
}

Consumer Pattern

  • Concept: A design pattern where a producer creates data and a consumer processes it, typically using a queue to manage the data flow.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProducerConsumerExample {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(5); // Create a blocking queue

        // Producer thread to produce data
        new Thread(() -> {
            try {
                String[] data = {"one", "two", "three", "four", "five"};
                for (String element : data) {
                    queue.put(element); // Put elements into the queue
                    System.out.println("Produced: " + element);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();

        // Consumer thread to consume data
        new Thread(() -> {
            try {
                while (true) {
                    String element = queue.take(); // Take elements from the queue
                    System.out.println("Consumed: " + element);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

Thread Congestion

  • Concept: Occurs when too many threads are competing for limited resources, leading to performance degradation.
public class ThreadCongestionExample {
    public static void main(String[] args) {
        // Task to be run by many threads
        Runnable task = () -> {
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName() + " is running");
                try {
                    Thread.sleep(100); // Simulate some work
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // Handle interrupt
                }
            }
        };

        // Creating too many threads can lead to congestion
        for (int i = 0; i < 100; i++) {
            new Thread(task).start();
        }
    }
}

Thread Signal

  • Concept: Mechanisms such as wait(), notify(), and notifyAll() are used for thread communication, allowing threads to signal each other for synchronization purposes.
public class ThreadSignalExample {
    private static final Object lock = new Object(); // Lock object for synchronization

    public static void main(String[] args) {
        // Thread that waits for a signal
        Thread waitingThread = new Thread(() -> {
            synchronized (lock) {
                try {
                    System.out.println("Waiting thread waiting...");
                    lock.wait(); // Wait for a signal
                    System.out.println("Waiting thread resumed");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // Handle interrupt
                }
            }
        });

        // Thread that sends a signal
        Thread notifyingThread = new Thread(() -> {
            synchronized (lock) {
                System.out.println("Notifying thread sending signal...");
                lock.notify(); // Send a signal
            }
        });

        waitingThread.start();
        notifyingThread.start();
    }
}

Real-time use cases

Implement scheduled thread pool executor

  • Thread pool is a collection of pre-created reusable threads that can be used to execute tasks. A thread pool reuses existing threads to execute multiple tasks.
  • The executor is a high-level interface for managing and controlling thread execution.

import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolExecutorExample{
    
    public static void main(String[] args){
    
      ScheduledThreadPoolExecutorService scheduler = Executor.newScheduledThreadPool(2);
      
      // Schedule a task to run after 5 sec delay
      
      scheduler.schedule(() -> {
      
        System.out.println("Task executed after 5 sec delay" + Thread.currentThread().getName());
        
      }, 5, TimeUnit.SECONDS);
      
      // Await termination of scheduler
      try {
      
        if (!scheduler.awaitTermination(25, TimeUnits.SECONDS)) {
            scheduler.shutDownNow();
        } catch (InterruptedException e){
            scheduler.shutdownNow();
        }
        
      
      }
    
    }

}




import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

public class CustomScheduledThreadPoolExecutor {

    // Task queue for immediate execution tasks
    private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();

    // Priority queue for scheduled tasks, sorted by execution time
    private final PriorityBlockingQueue<ScheduledTask> scheduledTaskQueue = new PriorityBlockingQueue<>();

    // Array of worker threads
    private final Thread[] workers;

    // Constructor to initialize the thread pool with a given size
    public CustomScheduledThreadPoolExecutor(int poolSize) {
        workers = new Thread[poolSize];

        // Start worker threads
        for (int i = 0; i < poolSize; i++) {
            workers[i] = new Worker();
            workers[i].start();
        }

        // Start the scheduler thread
        new Scheduler().start();
    }

    // Method to schedule a one-time task after a delay
    public void schedule(Runnable task, long delayMillis) {
        scheduledTaskQueue.offer(new ScheduledTask(task, System.currentTimeMillis() + delayMillis));
    }

    // Method to schedule a repeating task with a fixed rate
    public void scheduleAtFixedRate(Runnable task, long initialDelayMillis, long periodMillis) {
        scheduledTaskQueue.offer(new ScheduledTask(task, System.currentTimeMillis() + initialDelayMillis, periodMillis));
    }

    // Method to shut down the executor
    public void shutdown() {
        for (Thread worker : workers) {
            worker.interrupt(); // Interrupt each worker thread
        }
    }

    // Worker thread class responsible for executing tasks from the task queue
    private class Worker extends Thread {
        public void run() {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    // Take a task from the task queue and execute it
                    Runnable task = taskQueue.take();
                    task.run();
                }
            } catch (InterruptedException e) {
                // Thread interrupted, exit gracefully
            }
        }
    }

    // Scheduler thread class responsible for moving tasks from the scheduled queue to the task queue
    private class Scheduler extends Thread {
        public void run() {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    // Take the next scheduled task
                    ScheduledTask scheduledTask = scheduledTaskQueue.take();
                    long delay = scheduledTask.getExecutionTime() - System.currentTimeMillis();
                    
                    if (delay > 0) {
                        // If the task is not ready to be executed, sleep for the remaining delay
                        Thread.sleep(delay);
                    }
                    
                    // Move the task to the task queue for execution
                    taskQueue.offer(scheduledTask.getTask());
                    
                    if (scheduledTask.isRepeating()) {
                        // If the task is repeating, schedule it again
                        scheduledTask.setExecutionTime(System.currentTimeMillis() + scheduledTask.getPeriod());
                        scheduledTaskQueue.offer(scheduledTask);
                    }
                }
            } catch (InterruptedException e) {
                // Thread interrupted, exit gracefully
            }
        }
    }

    // Class representing a scheduled task with a comparable interface for priority queue ordering
    private static class ScheduledTask implements Comparable<ScheduledTask> {
        private final Runnable task;
        private long executionTime;
        private final long period;

        // Constructor for one-time tasks
        public ScheduledTask(Runnable task, long executionTime) {
            this(task, executionTime, -1);
        }

        // Constructor for repeating tasks
        public ScheduledTask(Runnable task, long executionTime, long period) {
            this.task = task;
            this.executionTime = executionTime;
            this.period = period;
        }

        public Runnable getTask() {
            return task;
        }

        public long getExecutionTime() {
            return executionTime;
        }

        public void setExecutionTime(long executionTime) {
            this.executionTime = executionTime;
        }

        public boolean isRepeating() {
            return period > 0;
        }

        public long getPeriod() {
            return period;
        }

        @Override
        public int compareTo(ScheduledTask other) {
            // Compare tasks based on their scheduled execution time
            return Long.compare(this.executionTime, other.executionTime);
        }
    }

    public static void main(String[] args) {
        // Create a custom scheduled thread pool executor with 2 worker threads
        CustomScheduledThreadPoolExecutor executor = new CustomScheduledThreadPoolExecutor(2);

        // Schedule a one-time task to run after 5 seconds
        executor.schedule(() -> System.out.println("Task executed after 5 seconds delay"), 5000);

        // Schedule a periodic task to run every 3 seconds
        executor.scheduleAtFixedRate(() -> System.out.println("Periodic task executed"), 0, 3000);

        // Schedule a task to shut down the executor after 20 seconds
        executor.schedule(() -> {
            System.out.println("Shutting down executor");
            executor.shutdown();
        }, 20000);
    }
}


Thread-safe LRU Cache

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadSafeLRUCache<K, V> {
    private final int capacity;
    private final Map<K, Node<K, V>> map;
    private final DoublyLinkedList<K, V> list;
    private final Lock lock = new ReentrantLock();

    // Constructor to initialize the LRU Cache with a given capacity
    public ThreadSafeLRUCache(int capacity) {
        this.capacity = capacity;
        this.map = new HashMap<>();
        this.list = new DoublyLinkedList<>();
    }

    // Method to get a value from the cache
    public V get(K key) {
        lock.lock();
        try {
            if (!map.containsKey(key)) {
                return null; // Return null if the key is not present
            }
            Node<K, V> node = map.get(key);
            list.moveToHead(node); // Move the accessed node to the head (most recently used)
            return node.value;
        } finally {
            lock.unlock(); // Ensure the lock is released
        }
    }

    // Method to put a key-value pair into the cache
    public void put(K key, V value) {
        lock.lock();
        try {
            if (map.containsKey(key)) {
                Node<K, V> node = map.get(key);
                node.value = value;
                list.moveToHead(node); // Move the updated node to the head
            } else {
                if (map.size() == capacity) {
                    Node<K, V> tail = list.removeTail(); // Remove the least recently used item
                    map.remove(tail.key);
                }
                Node<K, V> newNode = new Node<>(key, value);
                list.addToHead(newNode); // Add the new node to the head
                map.put(key, newNode);
            }
        } finally {
            lock.unlock(); // Ensure the lock is released
        }
    }

    // Node class representing each entry in the doubly linked list
    private static class Node<K, V> {
        K key;
        V value;
        Node<K, V> prev;
        Node<K, V> next;

        Node(K key, V value) {
            this.key = key;
            this.value = value;
        }
    }

    // Doubly linked list to maintain the order of usage
    private static class DoublyLinkedList<K, V> {
        private Node<K, V> head;
        private Node<K, V> tail;

        DoublyLinkedList() {
            head = new Node<>(null, null);
            tail = new Node<>(null, null);
            head.next = tail;
            tail.prev = head;
        }

        // Add a node to the head (most recently used)
        void addToHead(Node<K, V> node) {
            node.next = head.next;
            node.prev = head;
            head.next.prev = node;
            head.next = node;
        }

        // Remove a node from the list
        void removeNode(Node<K, V> node) {
            node.prev.next = node.next;
            node.next.prev = node.prev;
        }

        // Move a node to the head
        void moveToHead(Node<K, V> node) {
            removeNode(node);
            addToHead(node);
        }

        // Remove the tail node (least recently used)
        Node<K, V> removeTail() {
            Node<K, V> res = tail.prev;
            removeNode(res);
            return res;
        }
    }

    // Main method for testing the LRU Cache
    public static void main(String[] args) {
        ThreadSafeLRUCache<Integer, String> cache = new ThreadSafeLRUCache<>(2);
        cache.put(1, "one");
        cache.put(2, "two");
        System.out.println(cache.get(1)); // prints "one"
        cache.put(3, "three"); // evicts key 2
        System.out.println(cache.get(2)); // prints "null"
        cache.put(4, "four"); // evicts key 1
        System.out.println(cache.get(1)); // prints "null"
        System.out.println(cache.get(3)); // prints "three"
        System.out.println(cache.get(4)); // prints "four"
    }
}

Implement custom Executor class

  • Details of the class
    1. Submit a task and specify an ETA, and run the task at the ETA
    2. Cancel the task (if the task has not yet started.)
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.Map;
import java.time.Duration;
import java.time.Instant;

public class CustomExecutor {
    // ScheduledExecutorService to handle the scheduling of tasks
    private final ScheduledExecutorService scheduler;
    
    // Map to store tasks with their IDs for easy cancellation
    private final Map<Integer, ScheduledFuture<?>> tasks;
    
    // Counter to generate unique task IDs
    private int taskCounter;
    
    // Lock object to ensure thread-safe operations on task map and counter
    private final Object lock = new Object();

    // Constructor to initialize the scheduler and task map
    public CustomExecutor() {
        this.scheduler = Executors.newScheduledThreadPool(1);
        this.tasks = new HashMap<>();
        this.taskCounter = 0;
    }

    /**
     * Submit a task to be executed at a specified ETA.
     * @param task The task to be executed.
     * @param eta The estimated time of arrival (when the task should run).
     * @return The unique ID of the scheduled task.
     */
    public int submit(Runnable task, Instant eta) {
        synchronized (lock) {
            // Increment the task counter for unique task ID
            taskCounter++;
            int currentTaskId = taskCounter;
            
            // Calculate the delay in milliseconds from now until ETA
            long delay = Duration.between(Instant.now(), eta).toMillis();
            
            // If ETA is in the past, execute the task immediately
            if (delay < 0) {
                delay = 0;
            }
            
            // Schedule the task with the calculated delay
            ScheduledFuture<?> scheduledTask = scheduler.schedule(task, delay, TimeUnit.MILLISECONDS);
            
            // Store the task in the map with its unique ID
            tasks.put(currentTaskId, scheduledTask);
            return currentTaskId;
        }
    }

    /**
     * Cancel a scheduled task before it starts.
     * @param taskId The unique ID of the task to cancel.
     * @return True if the task was successfully cancelled, false otherwise.
     */
    public boolean cancel(int taskId) {
        synchronized (lock) {
            // Retrieve the scheduled task from the map
            ScheduledFuture<?> scheduledTask = tasks.remove(taskId);
            
            // If the task is found, attempt to cancel it
            if (scheduledTask != null) {
                return scheduledTask.cancel(false);
            }
            return false;
        }
    }

    /**
     * Shut down the executor, ensuring all tasks are properly handled.
     */
    public void shutdown() {
        scheduler.shutdown();
        try {
            // Wait for 1 second for existing tasks to complete
            if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
                // Force shutdown if tasks did not complete in time
                scheduler.shutdownNow();
            }
        } catch (InterruptedException e) {
            // Handle interruption and force shutdown
            scheduler.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    // Main method for demonstration purposes
    public static void main(String[] args) throws InterruptedException {
        // Create an instance of CustomExecutor
        CustomExecutor executor = new CustomExecutor();

        // Define a simple task to be executed
        Runnable task = () -> System.out.println("Task executed!");

        // Set ETA to 10 seconds from now
        Instant eta = Instant.now().plusSeconds(10);

        // Submit the task and get the task ID
        int taskId = executor.submit(task, eta);
        System.out.println("Task scheduled with ID: " + taskId);

        // Wait for 5 seconds before attempting to cancel the task
        Thread.sleep(5000);

        // Attempt to cancel the task
        if (executor.cancel(taskId)) {
            System.out.println("Task " + taskId + " cancelled.");
        } else {
            System.out.println("Failed to cancel task " + taskId + ".");
        }

        // Shut down the executor
        executor.shutdown();
    }
}

Sure, I’ll provide more detailed explanations and concise code examples for each topic.

1. Thread Synchronization with wait/notify/notifyAll

Explanation:

In Java, wait, notify, and notifyAll are methods of the Object class that are used for thread synchronization. These methods help coordinate the actions of multiple threads that share a common resource.

  • wait(): This method causes the current thread to wait until another thread calls notify() or notifyAll() on the same object. The thread releases ownership of the monitor and waits.
  • notify(): This method wakes up a single thread that is waiting on the object’s monitor. If multiple threads are waiting, one of them is chosen to be awakened.
  • notifyAll(): This method wakes up all threads that are waiting on the object’s monitor.

These methods must be called within a synchronized context, meaning the thread must hold the monitor lock of the object on which the method is being called.

Code Example:

class SharedResource {
    private int value;
    private boolean available = false;

    // Synchronized method for producing values
    public synchronized void produce(int newValue) throws InterruptedException {
        while (available) {
            wait(); // Release the lock and wait
        }
        value = newValue;
        available = true;
        notifyAll(); // Notify all waiting threads
    }

    // Synchronized method for consuming values
    public synchronized int consume() throws InterruptedException {
        while (!available) {
            wait(); // Release the lock and wait
        }
        available = false;
        notifyAll(); // Notify all waiting threads
        return value;
    }
}

public class WaitNotifyExample {
    public static void main(String[] args) {
        SharedResource resource = new SharedResource();
        
        // Producer thread
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    resource.produce(i);
                    System.out.println("Produced: " + i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // Consumer thread
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    int value = resource.consume();
                    System.out.println("Consumed: " + value);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

2. Spinlock vs. Mutex

Explanation:

  • Spinlock: A spinlock is a lock where the thread attempting to acquire the lock repeatedly checks if the lock is available. This “spinning” continues until the lock becomes available.
    • Advantages: Spinlocks can be more efficient than traditional locks if the wait time is expected to be very short because they avoid the overhead associated with putting a thread to sleep and waking it up.
    • Disadvantages: Spinlocks can waste CPU cycles because they keep the CPU busy while waiting for the lock.
  • Mutex (Mutual Exclusion): A mutex is a synchronization primitive that is used to prevent multiple threads from simultaneously executing critical sections of code which can lead to race conditions.
    • Advantages: Mutexes do not consume CPU resources while waiting because the waiting threads are put to sleep.
    • Disadvantages: Mutexes can incur context switching overhead, which can be expensive in terms of performance if the lock contention is high.

Spinlock Code Example:

import java.util.concurrent.atomic.AtomicBoolean;

public class SpinLock {
    private final AtomicBoolean lock = new AtomicBoolean(false);

    public void lock() {
        // Spin-wait (busy-wait) until the lock is acquired
        while (!lock.compareAndSet(false, true)) {
            // Do nothing, just keep trying
        }
    }

    public void unlock() {
        lock.set(false);
    }

    public static void main(String[] args) {
        SpinLock spinLock = new SpinLock();

        // Example usage of SpinLock
        Runnable task = () -> {
            spinLock.lock();
            try {
                // critical section
                System.out.println(Thread.currentThread().getName() + " acquired the lock");
            } finally {
                spinLock.unlock();
                System.out.println(Thread.currentThread().getName() + " released the lock");
            }
        };

        Thread t1 = new Thread(task, "Thread 1");
        Thread t2 = new Thread(task, "Thread 2");
        t1.start();
        t2.start();
    }
}

Mutex Code Example (using ReentrantLock):

import java.util.concurrent.locks.ReentrantLock;

public class MutexExample {
    private final ReentrantLock lock = new ReentrantLock();

    public void doWork() {
        lock.lock(); // Acquire the lock
        try {
            // Critical section
            System.out.println(Thread.currentThread().getName() + " acquired the lock");
        } finally {
            lock.unlock(); // Always release the lock in a finally block
            System.out.println(Thread.currentThread().getName() + " released the lock");
        }
    }

    public static void main(String[] args) {
        MutexExample example = new MutexExample();

        // Example usage of ReentrantLock
        Runnable task = example::doWork;

        Thread t1 = new Thread(task, "Thread 1");
        Thread t2 = new Thread(task, "Thread 2");
        t1.start();
        t2.start();
    }
}

3. Hybrid Mutex

Explanation:

A hybrid mutex combines the properties of spinlocks and traditional mutexes. It spins for a short period and then falls back to blocking if the lock is not acquired quickly. This approach can reduce the overhead of context switching while avoiding the CPU consumption of long spin-waits.

Code Example:

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

public class HybridMutex {
    private final AtomicBoolean spinLock = new AtomicBoolean(false);
    private final ReentrantLock lock = new ReentrantLock();

    public void lock() {
        // Try spin-lock first
        for (int i = 0; i < 100; i++) {
            if (spinLock.compareAndSet(false, true)) {
                return;
            }
        }
        // Fall back to blocking lock
        lock.lock();
    }

    public void unlock() {
        if (spinLock.get()) {
            spinLock.set(false);
        } else {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        HybridMutex hybridLock = new HybridMutex();

        // Example usage of HybridMutex
        Runnable task = () -> {
            hybridLock.lock();
            try {
                // critical section
                System.out.println(Thread.currentThread().getName() + " acquired the lock");
            } finally {
                hybridLock.unlock();
                System.out.println(Thread.currentThread().getName() + " released the lock");
            }
        };

        Thread t1 = new Thread(task, "Thread 1");
        Thread t2 = new Thread(task, "Thread 2");
        t1.start();
        t2.start();
    }
}

4. ReentrantLock & Condition

Explanation:

  • ReentrantLock: A reentrant mutual exclusion lock with the same basic behavior and semantics as the implicit monitor lock accessed using synchronized methods and statements, but with extended capabilities.
  • Condition: Provides a means for one thread to suspend execution until notified by another thread. Conditions must be associated with a lock and can be used for more complex thread synchronization than the traditional wait/notify mechanism.

Code Example:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class SharedResource {
    private int value;
    private boolean available = false;
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();

    public void produce(int newValue) throws InterruptedException {
        lock.lock();
        try {
            while (available) {
                condition.await(); // Wait until the resource is available
            }
            value = newValue;
            available = true;
            condition.signalAll(); // Notify all waiting threads
        } finally {
            lock.unlock();
        }
    }

    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (!available) {
                condition.await(); // Wait until the resource is not available
            }
            available = false;
            condition.signalAll(); // Notify all waiting threads
            return value;
        } finally {
            lock.unlock();
        }
    }
}

public class ConditionExample {
    public static void main(String[] args) {
        SharedResource resource = new SharedResource();

        // Producer thread
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    resource.produce(i);
                    System.out.println("Produced: " + i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // Consumer thread
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    int value = resource.consume();
                    System.out.println("Consumed: " + value);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

5. Classic Producer

/Consumer Bounded Buffer Queue

Explanation:

A BlockingQueue is used to manage the producer-consumer relationship with automatic synchronization. ArrayBlockingQueue is a bounded blocking queue backed by an array. This ensures thread-safe operations and handles synchronization internally.

Code Example:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

class Producer implements Runnable {
    private final BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                queue.put(i); // Add item to the queue
                System.out.println("Produced: " + i);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

class Consumer implements Runnable {
    private final BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                int value = queue.take(); // Remove item from the queue
                System.out.println("Consumed: " + value);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

public class BlockingQueueExample {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }
}

6. Two-Lock Queue

Explanation:

A two-lock queue uses separate locks for enqueue and dequeue operations to reduce contention and improve concurrency. This can help in scenarios where both operations are frequently used.

Code Example:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;

class TwoLockQueue<E> {
    private Node<E> head;
    private Node<E> tail;
    private final Lock enqLock = new ReentrantLock();
    private final Lock deqLock = new ReentrantLock();
    private final Condition notEmpty = deqLock.newCondition();

    private static class Node<E> {
        E item;
        Node<E> next;
        Node(E item) {
            this.item = item;
        }
    }

    public void enqueue(E item) {
        enqLock.lock();
        try {
            Node<E> newNode = new Node<>(item);
            if (tail == null) {
                head = tail = newNode;
            } else {
                tail.next = newNode;
                tail = newNode;
            }
        } finally {
            enqLock.unlock();
        }
        deqLock.lock();
        try {
            notEmpty.signal(); // Signal that the queue is not empty
        } finally {
            deqLock.unlock();
        }
    }

    public E dequeue() throws InterruptedException {
        deqLock.lock();
        try {
            while (head == null) {
                notEmpty.await(); // Wait until the queue is not empty
            }
            Node<E> node = head;
            head = head.next;
            if (head == null) {
                tail = null;
            }
            return node.item;
        } finally {
            deqLock.unlock();
        }
    }
}

7. Non-blocking Queue Implementation

Explanation:

A non-blocking queue uses atomic operations to ensure thread safety without locks. Java provides ConcurrentLinkedQueue for this purpose. This queue is thread-safe and uses a non-blocking algorithm to achieve high concurrency.

Code Example:

import java.util.concurrent.ConcurrentLinkedQueue;

public class NonBlockingQueueExample {
    public static void main(String[] args) {
        ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();

        // Producer thread
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                queue.add(i); // Add item to the queue
                System.out.println("Produced: " + i);
            }
        }).start();

        // Consumer thread
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                Integer value;
                while ((value = queue.poll()) == null) {
                    // Busy-wait
                }
                System.out.println("Consumed: " + value);
            }
        }).start();
    }
}

8. Semaphore

Explanation:

A semaphore controls access to a shared resource by maintaining a set of permits. Threads acquire permits before accessing the resource and release them afterward. If no permits are available, the acquiring thread blocks until a permit is released.

Code Example:

import java.util.concurrent.Semaphore;

public class SemaphoreExample {
    private final Semaphore semaphore = new Semaphore(1); // Binary semaphore (mutex)

    public void accessResource() throws InterruptedException {
        semaphore.acquire(); // Acquire a permit
        try {
            // critical section
            System.out.println("Resource accessed by " + Thread.currentThread().getName());
        } finally {
            semaphore.release(); // Release the permit
        }
    }

    public static void main(String[] args) {
        SemaphoreExample example = new SemaphoreExample();
        Runnable task = () -> {
            try {
                example.accessResource();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        };

        new Thread(task).start();
        new Thread(task).start();
    }
}

9. HashMap Implementation

Explanation:

To make a HashMap thread-safe using separate chaining, we can use ConcurrentHashMap. This class uses fine-grained locking and non-blocking algorithms to achieve high concurrency. It divides the map into segments, each of which can be locked independently, allowing multiple threads to access different segments concurrently.

Code Example:

import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;

public class ConcurrentHashMapExample {
    public static void main(String[] args) {
        Map<String, Integer> map = new ConcurrentHashMap<>();

        // Add key-value pairs to the map
        map.put("one", 1);
        map.put("two", 2);

        // Retrieve value by key
        System.out.println("Value for key 'one': " + map.get("one"));

        // Remove a key-value pair
        map.remove("two");

        // Check if a key exists
        System.out.println("Contains key 'two': " + map.containsKey("two"));
    }
}
  • These detailed explanations and code examples should provide a clearer understanding of each concurrency topic and how to implement them in Java.

Bounded Buffer Queue with Condition and ReentrantLock

Explanation:

A bounded buffer queue is a data structure that allows multiple producers to add items to the queue and multiple consumers to remove items from the queue. The queue has a fixed size, and it blocks producers when the queue is full and consumers when the queue is empty.

To implement this using Condition and ReentrantLock, we will:

  • Use a ReentrantLock to ensure mutual exclusion.
  • Use two Condition instances to manage the producer and consumer waiting conditions.
  • Use an array to store the buffer elements and manage the buffer indices.

Code Example:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBufferQueue<T> {
    private final T[] buffer;
    private int head, tail, count;
    private final Lock lock;
    private final Condition notFull, notEmpty;

    @SuppressWarnings("unchecked")
    public BoundedBufferQueue(int size) {
        buffer = (T[]) new Object[size];
        head = tail = count = 0;
        lock = new ReentrantLock();
        notFull = lock.newCondition();
        notEmpty = lock.newCondition();
    }

    // Adds an item to the buffer, blocks if the buffer is full
    public void put(T item) throws InterruptedException {
        lock.lock();
        try {
            while (count == buffer.length) {
                notFull.await(); // Wait until the buffer is not full
            }
            buffer[tail] = item;
            tail = (tail + 1) % buffer.length;
            count++;
            notEmpty.signal(); // Signal that the buffer is not empty
        } finally {
            lock.unlock();
        }
    }

    // Removes and returns an item from the buffer, blocks if the buffer is empty
    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await(); // Wait until the buffer is not empty
            }
            T item = buffer[head];
            head = (head + 1) % buffer.length;
            count--;
            notFull.signal(); // Signal that the buffer is not full
            return item;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        BoundedBufferQueue<Integer> buffer = new BoundedBufferQueue<>(5);

        // Producer thread
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    buffer.put(i);
                    System.out.println("Produced: " + i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // Consumer thread
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    int value = buffer.take();
                    System.out.println("Consumed: " + value);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

Detailed Explanation:

  1. Initialization:
    • The buffer is initialized with a fixed size.
    • head and tail pointers manage the front and rear of the queue.
    • count keeps track of the number of elements in the buffer.
    • lock is a ReentrantLock to ensure mutual exclusion.
    • notFull and notEmpty are Condition instances used to signal when the buffer is not full and not empty, respectively.
  2. put(T item):
    • Acquires the lock.
    • Checks if the buffer is full (count == buffer.length). If full, it waits (notFull.await()).
    • Adds the item to the buffer at the tail index.
    • Updates the tail index and increments the count.
    • Signals that the buffer is not empty (notEmpty.signal()).
    • Releases the lock.
  3. take():
    • Acquires the lock.
    • Checks if the buffer is empty (count == 0). If empty, it waits (notEmpty.await()).
    • Retrieves the item from the buffer at the head index.
    • Updates the head index and decrements the count.
    • Signals that the buffer is not full (notFull.signal()).
    • Releases the lock.
  4. Main Method:
    • Creates a bounded buffer with a capacity of 5.
    • Starts a producer thread that adds 10 items to the buffer.
    • Starts a consumer thread that retrieves 10 items from the buffer.
  • This implementation ensures that producers block when the buffer is full, and consumers block when the buffer is empty, maintaining synchronization between producer and consumer threads.

Pros and Cons of Mutex/Condition Variable

Mutex (Mutual Exclusion)

Pros:

  1. Thread Safety: Mutexes ensure that only one thread can access the critical section at a time, preventing race conditions.
  2. Blocking: When a thread attempts to acquire a mutex that is already locked, it gets put to sleep, freeing up the CPU for other tasks.
  3. ReentrantLock Features: With ReentrantLock, you get more control compared to synchronized blocks, such as the ability to interrupt a thread waiting for a lock, try acquiring the lock without blocking, and having multiple Condition objects for finer control over synchronization.

Cons:

  1. Deadlocks: If not used carefully, mutexes can lead to deadlocks, where two or more threads are waiting for each other to release locks, causing the application to hang.
  2. Performance Overhead: The blocking and unblocking of threads involve context switches, which can be expensive in terms of performance.
  3. Complexity: Properly managing the acquisition and release of locks can make the code more complex and harder to maintain.

Condition Variables

Pros:

  1. Fine-grained Synchronization: Condition variables allow threads to wait for specific conditions to be met, providing more fine-grained control over thread synchronization.
  2. Wait and Notify: Threads can wait for a condition to become true (await), and other threads can signal when the condition has changed (signal or signalAll), allowing for more efficient communication between threads.
  3. Multiple Conditions: With ReentrantLock, you can create multiple Condition objects for different conditions, making it easier to manage complex synchronization scenarios.

Cons:

  1. Complexity: Using condition variables correctly can be tricky. It requires a good understanding of how conditions and locks interact, which can increase the complexity of the code.
  2. Potential for Spurious Wakeups: Threads waiting on a condition can be woken up without a signal being called (known as a spurious wakeup), requiring the use of loops to recheck the condition.
  3. Deadlocks and Missed Signals: Incorrect use of condition variables can lead to deadlocks or missed signals, where a thread might never be woken up because the signal was sent before the thread started waiting.

Code Example

Let’s look at a simple example that demonstrates the use of ReentrantLock and Condition for a bounded buffer queue.

Example Code with Detailed Comments

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

// Bounded buffer queue implementation
public class BoundedBufferQueue<T> {
    private final T[] buffer;
    private int head, tail, count;
    private final Lock lock;
    private final Condition notFull, notEmpty;

    @SuppressWarnings("unchecked")
    public BoundedBufferQueue(int size) {
        buffer = (T[]) new Object[size];
        head = tail = count = 0;
        lock = new ReentrantLock();
        notFull = lock.newCondition();
        notEmpty = lock.newCondition();
    }

    // Adds an item to the buffer, blocks if the buffer is full
    public void put(T item) throws InterruptedException {
        lock.lock();
        try {
            while (count == buffer.length) {
                notFull.await(); // Wait until the buffer is not full
            }
            buffer[tail] = item;
            tail = (tail + 1) % buffer.length;
            count++;
            notEmpty.signal(); // Signal that the buffer is not empty
        } finally {
            lock.unlock();
        }
    }

    // Removes and returns an item from the buffer, blocks if the buffer is empty
    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await(); // Wait until the buffer is not empty
            }
            T item = buffer[head];
            head = (head + 1) % buffer.length;
            count--;
            notFull.signal(); // Signal that the buffer is not full
            return item;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        BoundedBufferQueue<Integer> buffer = new BoundedBufferQueue<>(5);

        // Producer thread
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    buffer.put(i);
                    System.out.println("Produced: " + i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // Consumer thread
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    int value = buffer.take();
                    System.out.println("Consumed: " + value);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

Detailed Explanation:

  1. Initialization:
    • The buffer is initialized with a fixed size.
    • head and tail pointers manage the front and rear of the queue.
    • count keeps track of the number of elements in the buffer.
    • lock is a ReentrantLock to ensure mutual exclusion.
    • notFull and notEmpty are Condition instances used to signal when the buffer is not full and not empty, respectively.
  2. put(T item):
    • Acquires the lock using lock.lock().
    • Checks if the buffer is full (count == buffer.length). If full, it waits (notFull.await()).
    • Adds the item to the buffer at the tail index.
    • Updates the tail index and increments the count.
    • Signals that the buffer is not empty (notEmpty.signal()).
    • Releases the lock using lock.unlock().
  3. take():
    • Acquires the lock using lock.lock().
    • Checks if the buffer is empty (count == 0). If empty, it waits (notEmpty.await()).
    • Retrieves the item from the buffer at the head index.
    • Updates the head index and decrements the count.
    • Signals that the buffer is not full (notFull.signal()).
    • Releases the lock using lock.unlock().
  4. Main Method:
    • Creates a bounded buffer with a capacity of 5.
    • Starts a producer thread that adds 10 items to the buffer.
    • Starts a consumer thread that retrieves 10 items from the buffer.
  • This implementation showcases how to use ReentrantLock and Condition to manage a bounded buffer queue, ensuring that producers and consumers are properly synchronized.

It looks like the question is about implementing concurrent methods for a class that handles sending and receiving requests, similar to a bounded buffer or queue system. The goal might be to ensure that these operations are thread-safe and manage synchronization between producing (sending) and consuming (receiving) requests.

Possible Concurrency Question

Question: Implement a backend class that handles sending and receiving requests using a bounded buffer. Complete the sendRequest and receiveRequest methods to achieve the following requirements:

  • The sendRequest method should add a request to the buffer if there is space available, or block if the buffer is full.
  • The receiveRequest method should remove a request from the buffer if available, or block if the buffer is empty.
  • Ensure that the operations are thread-safe and handle synchronization between threads properly.

Implementation Steps

  1. Define the Class Structure: Create a class with a buffer to hold the requests, and synchronization primitives to manage concurrent access.
  2. Buffer Initialization: Initialize the buffer with a fixed size.
  3. Synchronization Mechanisms: Use ReentrantLock for mutual exclusion and Condition objects to manage the buffer’s full and empty states.
  4. Implement sendRequest Method: Ensure it adds requests to the buffer and handles blocking when the buffer is full.
  5. Implement receiveRequest Method: Ensure it removes requests from the buffer and handles blocking when the buffer is empty.

Example Code

Here’s how you might implement the class:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class RequestBuffer<T> {
    private final T[] buffer;
    private int head, tail, count;
    private final Lock lock;
    private final Condition notFull, notEmpty;

    @SuppressWarnings("unchecked")
    public RequestBuffer(int size) {
        buffer = (T[]) new Object[size];
        head = tail = count = 0;
        lock = new ReentrantLock();
        notFull = lock.newCondition();
        notEmpty = lock.newCondition();
    }

    // Method to send (produce) a request
    public void sendRequest(T request) throws InterruptedException {
        lock.lock();
        try {
            while (count == buffer.length) {
                notFull.await(); // Wait until the buffer is not full
            }
            buffer[tail] = request;
            tail = (tail + 1) % buffer.length;
            count++;
            notEmpty.signal(); // Signal that the buffer is not empty
        } finally {
            lock.unlock();
        }
    }

    // Method to receive (consume) a request
    public T receiveRequest() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await(); // Wait until the buffer is not empty
            }
            T request = buffer[head];
            head = (head + 1) % buffer.length;
            count--;
            notFull.signal(); // Signal that the buffer is not full
            return request;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        RequestBuffer<String> requestBuffer = new RequestBuffer<>(5);

        // Producer thread
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    requestBuffer.sendRequest("Request " + i);
                    System.out.println("Sent: Request " + i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // Consumer thread
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    String request = requestBuffer.receiveRequest();
                    System.out.println("Received: " + request);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

Detailed Explanation:

  1. Class Initialization:
    • The buffer is initialized to a fixed size.
    • head and tail pointers manage the start and end of the queue.
    • count tracks the number of elements in the buffer.
    • lock ensures mutual exclusion.
    • notFull and notEmpty are conditions used to signal the state of the buffer.
  2. sendRequest(T request):
    • Acquires the lock.
    • Checks if the buffer is full. If it is, the method waits (notFull.await()).
    • Adds the request to the buffer.
    • Updates the tail index and increments count.
    • Signals that the buffer is not empty (notEmpty.signal()).
    • Releases the lock.
  3. receiveRequest():
    • Acquires the lock.
    • Checks if the buffer is empty. If it is, the method waits (notEmpty.await()).
    • Retrieves the request from the buffer.
    • Updates the head index and decrements count.
    • Signals that the buffer is not full (notFull.signal()).
    • Releases the lock.

This implementation ensures that the sendRequest and receiveRequest methods are thread-safe and properly synchronized, managing the state of the buffer and coordinating producer and consumer threads.

Explanation of the Question

Question: Implement a WeightedCache class that manages items with associated weights. The cache should only keep items whose total weight does not exceed a specified maximum total weight (maxTotalWeight). You need to complete the sendRequest and receiveRequest methods to handle concurrent access to the cache in a thread-safe manner.

Strategy

  1. Class Initialization:
    • Define a fixed-size buffer (maxTotalWeight) for the cache.
    • Use ReentrantLock to manage mutual exclusion and ensure thread safety.
    • Use Condition objects to manage the full and empty states of the buffer.
  2. Data Structures:
    • Use a HashMap to store the cache items.
    • Use a PriorityQueue to manage the weights of the items for efficient access.
  3. Concurrency Management:
    • Implement the put method to add items to the cache. It should wait if adding the item exceeds maxTotalWeight.
    • Implement the get method to retrieve items from the cache in a thread-safe manner.

Code with Comments

import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

// Class to represent the weighted cache
public class WeightedCache {
    private final int maxTotalWeight; // Maximum allowed total weight for the cache
    private int currentWeight; // Current total weight of items in the cache
    private final Map<String, CacheItem> cache; // Map to store cache items
    private final PriorityQueue<CacheItem> weightQueue; // Priority queue to manage item weights
    private final Lock lock; // Lock for mutual exclusion
    private final Condition notFull; // Condition to wait if the cache is full

    // Cache item class to store key, value, and weight
    private static class CacheItem {
        String key;
        String value;
        int weight;

        CacheItem(String key, String value, int weight) {
            this.key = key;
            this.value = value;
            this.weight = weight;
        }
    }

    // Constructor to initialize the weighted cache
    public WeightedCache(int maxTotalWeight) {
        this.maxTotalWeight = maxTotalWeight;
        this.currentWeight = 0;
        this.cache = new HashMap<>();
        this.weightQueue = new PriorityQueue<>((a, b) -> Integer.compare(a.weight, b.weight));
        this.lock = new ReentrantLock();
        this.notFull = lock.newCondition();
    }

    // Method to get the value associated with a key in the cache
    public String get(String key) {
        lock.lock();
        try {
            CacheItem item = cache.get(key);
            return item == null ? null : item.value;
        } finally {
            lock.unlock();
        }
    }

    // Method to put a key-value pair in the cache with a specified weight
    public void put(String key, String value, int weight) throws InterruptedException {
        lock.lock();
        try {
            // Wait if adding the new item would exceed the max total weight
            while (currentWeight + weight > maxTotalWeight) {
                notFull.await();
            }
            // Remove the existing item if the key is already in the cache
            if (cache.containsKey(key)) {
                removeItem(key);
            }
            // Add the new item to the cache and priority queue
            CacheItem newItem = new CacheItem(key, value, weight);
            cache.put(key, newItem);
            weightQueue.add(newItem);
            currentWeight += weight;
            // Signal other threads that they may proceed
            notFull.signalAll();
        } finally {
            lock.unlock();
        }
    }

    // Helper method to remove an item from the cache
    private void removeItem(String key) {
        CacheItem item = cache.remove(key);
        if (item != null) {
            weightQueue.remove(item);
            currentWeight -= item.weight;
        }
    }

    public static void main(String[] args) {
        WeightedCache cache = new WeightedCache(10);

        // Producer thread to add items to the cache
        Thread producer = new Thread(() -> {
            try {
                cache.put("key1", "value1", 3);
                System.out.println("Put key1: value1, weight 3");
                cache.put("key2", "value2", 4);
                System.out.println("Put key2: value2, weight 4");
                cache.put("key3", "value3", 5);
                System.out.println("Put key3: value3, weight 5");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // Consumer thread to retrieve items from the cache
        Thread consumer = new Thread(() -> {
            try {
                Thread.sleep(1000); // Allow producer to add items first
                System.out.println("Get key1: " + cache.get("key1"));
                System.out.println("Get key2: " + cache.get("key2"));
                System.out.println("Get key3: " + cache.get("key3"));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

Detailed Explanation of Code with Comments:

  1. Class Initialization:
    • maxTotalWeight: Defines the maximum weight allowed for items in the cache.
    • currentWeight: Tracks the total weight of items currently in the cache.
    • cache: A HashMap to store the key-value pairs.
    • weightQueue: A PriorityQueue to manage and prioritize items based on their weight.
    • lock: A ReentrantLock to ensure that only one thread can modify the cache at a time.
    • notFull: A Condition used to signal when space becomes available in the cache.
  2. CacheItem Class:
    • Represents an item in the cache with a key, value, and weight.
  3. Constructor:
    • Initializes the cache, priority queue, lock, and condition.
  4. get(String key):
    • Acquires the lock using lock.lock().
    • Retrieves the item from the cache if it exists.
    • Releases the lock using lock.unlock().
  5. put(String key, String value, int weight):
    • Acquires the lock using lock.lock().
    • Waits (notFull.await()) if adding the new item would exceed the maxTotalWeight.
    • Removes the existing item if the key is already in the cache.
    • Adds the new item to the cache and priority queue.
    • Updates the currentWeight.
    • Signals other waiting threads (notFull.signalAll()) that space may be available in the cache.
    • Releases the lock using lock.unlock().
  6. removeItem(String key):
    • Removes the item from the cache and priority queue.
    • Updates the currentWeight.
  7. Main Method:
    • Creates a WeightedCache with a capacity of 10.
    • Starts a producer thread that adds items to the cache.
    • Starts a consumer thread that retrieves items from the cache.

This implementation ensures that the cache operations are thread-safe and handle concurrent access properly, making efficient use of ReentrantLock and Condition to manage synchronization.

References