Concepts • Concurrency
- Overview
- Threads or Platform Thread
- Start Multiple Threads
- OS Thread vs Platform Thread vs Virtual Thread
- Memory
- Synchronized Exchanger Class
- Java Volatile
- Java ThreadLocal
- Race Conditions in Multithreading
- Concurrency vs Parallelism
- Critical Section
- Deadlock
- Threadpool
- Java Lock
- Executor Service
- Blocking Queue
- Consumer Pattern
- Thread Congestion
- Thread Signal
- Real-time use cases
- Implement scheduled thread pool executor
- Thread-safe LRU Cache
- Implement custom Executor class
- 1. Thread Synchronization with
wait
/notify
/notifyAll
- 2. Spinlock vs. Mutex
- 3. Hybrid Mutex
- 4. ReentrantLock & Condition
- 5. Classic Producer
- 6. Two-Lock Queue
- 7. Non-blocking Queue Implementation
- 8. Semaphore
- 9. HashMap Implementation
- Bounded Buffer Queue with
Condition
andReentrantLock
- Detailed Explanation:
- Pros and Cons of Mutex/Condition Variable
- Code Example
- Detailed Explanation:
- Possible Concurrency Question
- Implementation Steps
- Example Code
- Detailed Explanation:
- Explanation of the Question
- Strategy
- Code with Comments
- Detailed Explanation of Code with Comments:
- References
Overview
- In this primer, we will explore the concept of concurrency in computer science, providing examples in both Java and Python. Concurrency is a crucial topic, particularly in the context of machine learning (ML) infrastructure, where efficient resource management and parallel processing are key to handling large-scale data and computational tasks.
- Below, we see the overall lifecycle of a thread (source)
Understanding Multithreading and Multitasking
- Multithreading: This technique involves running multiple threads within a single application, allowing different tasks to be executed simultaneously. In a machine learning context, multithreading can be used to parallelize operations such as data preprocessing, model training, and real-time data analysis. For example, while one thread handles data loading, another can preprocess the data, and yet another can perform model evaluation. This parallelism not only speeds up the computation but also optimizes CPU usage, especially during I/O-bound tasks like reading from disk or network operations.
-
Source: YouTube Video
-
Multitasking: This involves running multiple applications concurrently, each potentially comprising several threads. In ML infrastructure, multitasking enables the simultaneous execution of various services, such as data ingestion, model serving, and monitoring. This concurrent execution is vital in production environments where real-time inference and continuous integration/deployment (CI/CD) processes must run smoothly and efficiently.
-
Concurrency in ML Infrastructure: Concurrency allows ML infrastructure to efficiently handle multiple simultaneous tasks, such as data ingestion, preprocessing, model training, and serving. AWS components like EC2, Lambda, and SageMaker manage these tasks concurrently, allowing the infrastructure to scale based on demand.
-
In the following sections, we will delve deeper into these concepts, exploring their implications and applications in modern computing, particularly in the context of machine learning systems.
Threads or Platform Thread
- Create a New Thread and Start It:
- To create a new thread in Java, you can define a class that extends
Thread
and override therun
method. This method contains the code that should run in the new thread. The thread is started using thestart
method, which in turn calls therun
method.
class MyThread extends Thread { public void run() { System.out.println("Thread is running."); } } public class Main { public static void main(String[] args) { MyThread t1 = new MyThread(); t1.start(); } }
Pros:
- Simple and straightforward to use.
- Direct access to the
Thread
class methods.
Cons:
- Extending
Thread
means the class cannot extend any other class, which limits flexibility. - Not recommended for large projects where multiple inheritance might be required.
- To create a new thread in Java, you can define a class that extends
- Java Lambda Expression Code:
- In Java 8 and later, you can use lambda expressions with the
Runnable
interface to define the task for a thread. This approach is more concise and readable, especially for simple tasks.
Runnable r = () -> { System.out.println("Thread using lambda expression is running."); }; new Thread(r).start();
Pros:
- More concise and easier to read, especially for short tasks.
- Does not require creating a separate class.
Cons:
- Less control over thread-specific methods and behaviors.
- Not ideal for complex threading logic where more control is needed.
- In Java 8 and later, you can use lambda expressions with the
- Java Subclass of Thread:
- Similar to the first method, but here we emphasize creating a subclass specifically for threading logic.
class MyThreadSubclass extends Thread { public void run() { System.out.println("Thread subclass is running."); } } public class Main { public static void main(String[] args) { MyThreadSubclass t1 = new MyThreadSubclass(); t1.start(); } }
Pros:
- Encapsulates thread logic within a specialized subclass.
- Useful for creating complex thread-specific behaviors.
Cons:
- Same limitations as extending the
Thread
class directly, such as single inheritance.
- Java Anonymous Class:
- This method involves using an anonymous inner class to define the thread’s behavior, providing a quick way to create a thread without needing a named class.
Thread t = new Thread() { public void run() { System.out.println("Thread using anonymous class is running."); } }; t.start();
Pros:
- Useful for quick and temporary threads.
- No need for a separate class file.
Cons:
- Can become messy and hard to read with more complex logic.
- Limited reusability and difficult to debug.
- Java Runnable Interface Implementation:
- Implementing the
Runnable
interface is the most flexible and recommended approach, especially in large projects. TheRunnable
interface defines a single method,run
, which is implemented to contain the code to be executed in the thread.
class MyRunnable implements Runnable { public void run() { System.out.println("Runnable thread is running."); } } public class Main { public static void main(String[] args) { MyRunnable r = new MyRunnable(); Thread t = new Thread(r); t.start(); } }
Pros:
- Does not restrict inheritance, allowing the class to extend other classes if needed.
- Decouples task logic from thread management, making it more reusable and easier to manage.
Cons:
- Slightly more verbose than using lambda expressions for simple tasks.
- Implementing the
-
The choice of method depends on the specific needs of your application:
- For simple and quick implementations, lambda expressions or anonymous classes are convenient.
- For more complex threading logic or where thread-specific behaviors are needed, subclassing Thread might be appropriate.
-
The Runnable interface is generally recommended for its flexibility and separation of concerns, especially in larger projects or when needing to extend other classes.
- Overall, the Runnable interface approach is often considered the best practice due to its flexibility and maintainability.
Start Multiple Threads
- Java Code for 2 Threads with Runnable:
- Using the
Runnable
interface, you can create multiple threads by defining tasks within therun
method.
class Task implements Runnable { private String name; public Task(String name) { this.name = name; } public void run() { for(int i = 0; i < 5; i++) { System.out.println(name + " is running, iteration: " + i); } } } public class Main { public static void main(String[] args) { Thread t1 = new Thread(new Task("Thread 1")); Thread t2 = new Thread(new Task("Thread 2")); t1.start(); t2.start(); } }
- Using the
- Thread.sleep:
- The
Thread.sleep
method pauses the execution of the current thread for a specified time in milliseconds.
try { Thread.sleep(1000); // Sleep for 1 second } catch (InterruptedException e) { e.printStackTrace(); }
Pros:
- Simple way to delay thread execution.
Cons:
- May not be precise due to system scheduler limitations.
- Can lead to inefficient resource use if overused.
- The
- Manage Thread Stop Yourself for Runnable:
- To stop a thread gracefully, you can use a flag that the
Runnable
checks periodically.
class StoppableTask implements Runnable { private volatile boolean running = true; public void run() { while (running) { System.out.println("Task is running..."); try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } System.out.println("Task stopped."); } public void stop() { running = false; } } public class Main { public static void main(String[] args) throws InterruptedException { StoppableTask task = new StoppableTask(); Thread t = new Thread(task); t.start(); Thread.sleep(2000); // Let the task run for a bit task.stop(); // Request the task to stop t.join(); // Wait for the task to finish } }
- To stop a thread gracefully, you can use a flag that the
- Explain Java Daemon Thread, Show Code:
- A daemon thread runs in the background and does not prevent the JVM from exiting once all user threads have finished.
public class DaemonThreadExample { public static void main(String[] args) { Thread daemonThread = new Thread(() -> { while (true) { System.out.println("Daemon thread is running..."); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }); daemonThread.setDaemon(true); daemonThread.start(); System.out.println("Main thread is finished."); } }
Pros:
- Useful for background tasks (e.g., garbage collection, monitoring).
Cons:
- May not finish execution if JVM exits before the thread completes.
- Thread.join Explain:
- The
join
method allows one thread to wait for the completion of another.
Thread t = new Thread(() -> { System.out.println("Thread started."); }); t.start(); try { t.join(); // Wait for thread t to finish System.out.println("Thread finished."); } catch (InterruptedException e) { e.printStackTrace(); }
Pros:
- Ensures a thread completes before proceeding, useful in synchronization.
Cons:
- Can cause delays if the joined thread takes a long time to finish.
- The
- Java Has OS Level Threads, Python Has…:
- Java:
- Java threads are typically mapped to native OS threads, managed by the JVM.
- Pros: Efficient CPU utilization, leveraging multi-core processors.
- Cons: Requires significant system resources; limited by the number of OS threads.
- Python:
- Python uses the Global Interpreter Lock (GIL) in CPython, which allows only one thread to execute Python bytecode at a time.
- Pros: Simplifies memory management, safe concurrent access.
- Cons: Limits true parallel execution of threads, better suited for I/O-bound rather than CPU-bound tasks.
- Java:
- These Require Stack Space:
- Both Java and Python threads require stack space for each thread’s execution context.
- Pros: Allows separate execution contexts.
- Cons: Consumes memory; too many threads can lead to resource exhaustion.
- Java Project LOOM:
- Aims to introduce lightweight, user-mode threads (fibers) to the JVM, making concurrent programming easier and more efficient.
- Pros: Reduced resource overhead, potentially thousands of fibers per OS thread.
- Cons: Still in development, may require changes in how concurrency is approached.
- JVM:
- The Java Virtual Machine (JVM) manages Java threads, providing platform independence and handling thread scheduling, memory management, and more.
- Pros: Cross-platform capabilities, robust thread management.
- Cons: Potential overhead compared to native code execution, requires tuning for optimal performance.
- This section outlines various threading concepts and their implementation in Java, including managing multiple threads, understanding daemon threads, and comparing Java and Python’s threading models.
Virtual Threads
- Virtual threads, as depicted above (source), are a concept in concurrent programming, particularly relevant in modern programming languages and runtime environments.
- Virtual threads are a lighter-weight abstraction, often managed at the language runtime level rather than by the operating system. This can also be referred to as user-mode threads or green threads in some contexts.
- They are designed to be much less resource-intensive compared to traditional threads, allowing the creation of a much larger number of them.
- Virtual threads can be scheduled and managed by the runtime environment, which can be more efficient than relying solely on the OS for context switching and scheduling.
- In this example, Executors.newVirtualThreadPerTaskExecutor() creates an executor that uses virtual threads. Each task is executed in its own virtual thread, and the virtual threads are much lighter than traditional OS threads.
import java.util.concurrent.Executors;
public class VirtualThreadsExample {
public static void main(String[] args) {
var executor = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
System.out.println("Running in virtual thread: " + Thread.currentThread());
try {
Thread.sleep(1000); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.close(); // Properly shutdown the executor
}
}
- Platform Threads:
- Use When: CPU-bound tasks, low to moderate concurrency, requiring OS-level interactions, real-time requirements.
- Characteristics: Heavyweight, blocking operations block OS threads, managed by OS, higher context switching cost.
- Virtual Threads:
- Use When: High concurrency, I/O-bound applications, simplified concurrency management, resource efficiency.
- Characteristics: Lightweight, blocking operations do not block OS threads, managed by Java runtime, lower context switching cost.
Java (Using Project Loom for Virtual Threads)
Project Loom introduces virtual threads to the Java ecosystem, allowing lightweight concurrency. Below is a simple example:
import java.util.concurrent.Executors;
public class VirtualThreadsExample {
public static void main(String[] args) {
var executor = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
System.out.println("Running in virtual thread: " + Thread.currentThread());
try {
Thread.sleep(1000); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.close(); // Properly shutdown the executor
}
}
- In this example,
Executors.newVirtualThreadPerTaskExecutor()
creates an executor that uses virtual threads. Each task is executed in its own virtual thread, and the virtual threads are much lighter than traditional OS threads.
Python (Using asyncio)
Python does not have native virtual threads like Java’s Project Loom, but asyncio
provides a way to write concurrent code using coroutines, which can be thought of as lightweight “threads” managed by the Python runtime.
import asyncio
async def task():
print(f"Running in task: {asyncio.current_task().get_name()}")
await asyncio.sleep(1) # Simulate work
async def main():
tasks = [task() for _ in range(10)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
- In this Python example,
asyncio
is used to run multiple coroutines concurrently. Theasyncio.gather()
function runs all the coroutines concurrently, andasyncio.run()
is used to start the event loop.
Key Differences and Similarities:
- Java Virtual Threads: With Project Loom, Java introduces virtual threads that are lightweight and managed by the Java runtime. They allow for a synchronous programming style while still achieving high concurrency.
- Python asyncio: Python’s asyncio provides an asynchronous programming model using coroutines. While not exactly the same as Java’s virtual threads, they achieve a similar effect by allowing the Python runtime to manage the execution of concurrent tasks without the need for traditional threading.
OS Thread vs Platform Thread vs Virtual Thread
- The image below (source) displays the three tiers of threads in Java: OS Threads, Platform Threads, and Virtual Threads.
OS Threads
- Definition: OS threads are managed directly by the operating system. Each Java thread corresponds to an OS thread, which handles scheduling and execution.
- Pros:
- Leverage multi-core processors effectively.
- Full support from the operating system for features like scheduling and prioritization.
- Cons:
- High memory overhead due to stack space and context switching.
- Limited by the number of threads the OS can efficiently manage, leading to potential scalability issues.
Platform Threads
- Definition: Platform threads, also known as native threads, are the traditional Java threads managed by the JVM. They are essentially wrappers around OS threads, providing platform independence.
- Pros:
- Cross-platform compatibility and ease of use within the Java ecosystem.
- Utilizes the OS for low-level thread management, benefiting from native optimizations.
- Cons:
- Similar to OS threads, they are resource-intensive.
- Can suffer from context-switching overhead, which impacts performance, especially with a high number of threads.
Virtual Threads
- Definition: Virtual threads are a newer concept introduced with Project Loom in Java. They are lightweight, user-mode threads managed by the JVM rather than the OS.
- Pros:
- Low overhead, allowing for a large number of threads (potentially millions) without significant performance degradation.
- Easier and more efficient handling of concurrency, particularly in applications with high I/O-bound workloads.
- Cons:
- Still in development and not yet widely adopted or available in all Java environments.
- May require rethinking traditional concurrency patterns and practices.
Summary:
- OS and Platform Threads are similar in terms of functionality and overhead, with the primary distinction being that OS threads are directly managed by the operating system, while platform threads are managed by the JVM.
-
Virtual Threads offer a promising future for Java concurrency, with potential to vastly improve scalability and performance by significantly reducing the resource footprint of each thread.
- This breakdown provides an understanding of the different types of threads available in Java and their respective advantages and disadvantages.
Memory
How Java Threads Access Memory
- Memory Access Mechanism:
- Java threads access memory through the Java Virtual Machine (JVM), which manages memory allocation and garbage collection. Each thread has its own stack for storing local variables, method call information, and control flow. Shared objects are stored in the heap, which all threads can access.
Java Code
-
Code to Illustrate Memory Access:
// Class representing a shared resource accessed by multiple threads class SharedResource { private int counter = 0; // Synchronized method to safely increment the counter public synchronized void increment() { counter++; } // Method to retrieve the current value of the counter public int getCounter() { return counter; } } public class MemoryExample { public static void main(String[] args) { // SharedResource object that will be accessed by multiple threads SharedResource resource = new SharedResource(); // First thread that increments the counter Thread t1 = new Thread(() -> { for (int i = 0; i < 1000; i++) { resource.increment(); // Safely increment counter } }); // Second thread that also increments the counter Thread t2 = new Thread(() -> { for (int i = 0; i < 1000; i++) { resource.increment(); // Safely increment counter } }); // Start both threads t1.start(); t2.start(); try { // Wait for both threads to complete t1.join(); t2.join(); } catch (InterruptedException e) { e.printStackTrace(); } // Print the final value of the counter System.out.println("Final counter value: " + resource.getCounter()); } }
- In this example, the
SharedResource
class contains acounter
variable. Theincrement
method is synchronized to ensure thread-safe access when multiple threads modify the counter simultaneously.
- In this example, the
Thread and Thread Stack and Heap
- Thread:
- Each Java thread has its own stack, which is separate from other threads. This stack stores local variables, method call frames, and control information.
- Thread Stack:
- The thread stack is specific to each thread, ensuring that local variables and method calls are isolated between threads, maintaining thread safety for these variables.
- The stack holds primitive data types and references to objects in the heap.
- Heap:
- The heap is a shared memory area where objects are allocated. All threads in a Java application can access the heap, making it possible for threads to share objects and communicate.
- The JVM’s garbage collector manages heap memory, reclaiming space used by objects that are no longer in use.
Now How Does Memory Look on Hardware Architecture:
- CPU:
- Thread and CPU Registers:
- CPU registers are the fastest memory components and store immediate values for computations. Each thread has its own set of registers while running on a CPU core.
- Thread and CPU Registers:
- L1, L2, L3 Cache:
- L1 Cache:
- The smallest and fastest cache, located closest to the CPU cores, storing frequently accessed data and instructions.
- L2 Cache:
- Larger than L1 but slower, serving as a buffer between the L1 cache and the main memory.
- L3 Cache:
- The largest and slowest cache, shared among multiple cores, providing a last-level cache before accessing main memory.
- L1 Cache:
- RAM:
- Thread Stack:
- Part of RAM where each thread’s stack is stored, containing local variables and method call frames.
- Heap:
- The main area in RAM where all Java objects are allocated. The heap is shared among all threads, allowing for object sharing and communication between threads. The JVM manages the heap, handling object allocation and garbage collection.
- Thread Stack:
- This overview covers how Java threads interact with memory, including the roles of the thread stack, heap, and various hardware memory components.
Synchronized Exchanger Class
- Concept: A Synchronized Exchanger class is used to safely exchange data between two threads, ensuring that both threads synchronize their operations when exchanging data.
class SynchronizedExchanger {
private Object sharedData; // Shared data to be exchanged
private boolean isEmpty = true; // Flag to check if data is available
// Method to set data in the exchanger, synchronized to ensure thread safety
public synchronized void set(Object data) {
while (!isEmpty) {
try {
wait(); // Wait until the data is consumed
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore interrupt status
}
}
isEmpty = false;
this.sharedData = data;
notifyAll(); // Notify waiting threads that data is set
}
// Method to get data from the exchanger, synchronized to ensure thread safety
public synchronized Object get() {
while (isEmpty) {
try {
wait(); // Wait until the data is available
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore interrupt status
}
}
isEmpty = true;
Object data = sharedData;
notifyAll(); // Notify waiting threads that data has been consumed
return data;
}
}
public class ExchangerExample {
public static void main(String[] args) {
SynchronizedExchanger exchanger = new SynchronizedExchanger();
// Producer thread sets data in the exchanger
new Thread(() -> {
exchanger.set("Data from producer");
System.out.println("Producer set data.");
}).start();
// Consumer thread gets data from the exchanger
new Thread(() -> {
Object data = exchanger.get();
System.out.println("Consumer received: " + data);
}).start();
}
}
Java Volatile
- Concept: The
volatile
keyword ensures that updates to a variable are immediately visible to all threads, preventing caching of the variable’s value. - In Java, the volatile keyword is used to mark a variable as being stored in main memory. More precisely, it ensures that reads and writes to that variable are directly made to and from the main memory, rather than being cached locally in the CPU cache of a thread. This guarantees visibility of changes to variables across threads.
public class VolatileExample {
private volatile boolean running = true; // Volatile variable to ensure visibility
// Method to stop the thread safely
public void stopRunning() {
running = false; // Update running variable, immediately visible to other threads
}
// Method representing the thread's task
public void run() {
while (running) {
System.out.println("Thread is running");
}
System.out.println("Thread stopped");
}
public static void main(String[] args) throws InterruptedException {
VolatileExample example = new VolatileExample();
Thread t = new Thread(example::run);
t.start();
Thread.sleep(1000); // Let the thread run for a bit
example.stopRunning(); // Stop the thread
t.join(); // Wait for the thread to finish
}
}
Java ThreadLocal
- Concept:
ThreadLocal
provides thread-local variables, ensuring that each thread accessing such a variable has its own independent copy.
public class ThreadLocalExample {
// ThreadLocal variable with an initial value
private static final ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 1);
public static void main(String[] args) {
// First thread sets and gets the ThreadLocal variable
Thread t1 = new Thread(() -> {
threadLocal.set(100); // Set thread-specific value
System.out.println("Thread 1: " + threadLocal.get()); // Get thread-specific value
});
// Second thread sets and gets the ThreadLocal variable
Thread t2 = new Thread(() -> {
threadLocal.set(200); // Set thread-specific value
System.out.println("Thread 2: " + threadLocal.get()); // Get thread-specific value
});
t1.start();
t2.start();
}
}
Race Conditions in Multithreading
- Concept: Race conditions occur when two or more threads access shared data concurrently and attempt to modify it. This can lead to inconsistent or unexpected results.
class Counter {
private int count = 0; // Shared variable
// Method to increment the count, not synchronized (potential race condition)
public void increment() {
count++;
}
public int getCount() {
return count;
}
}
public class RaceConditionExample {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
// Thread 1 incrementing the counter
Thread t1 = new Thread(counter::increment);
// Thread 2 incrementing the counter
Thread t2 = new Thread(counter::increment);
t1.start();
t2.start();
t1.join();
t2.join();
// Output may not be as expected due to race condition
System.out.println("Final count: " + counter.getCount());
}
}
Concurrency vs Parallelism
- Concept:
- Concurrency: Making progress on more than one task by interleaving execution, not necessarily at the same time.
- Parallelism: Executing multiple tasks simultaneously using multiple processors or cores.
public class ConcurrencyParallelismExample {
public static void main(String[] args) {
// Task 1 for demonstrating concurrency
Runnable task1 = () -> {
for (int i = 0; i < 5; i++) {
System.out.println("Task 1 - Count: " + i);
try {
Thread.sleep(500); // Simulate time-consuming work
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// Task 2 for demonstrating concurrency
Runnable task2 = () -> {
for (int i = 0; i < 5; i++) {
System.out.println("Task 2 - Count: " + i);
try {
Thread.sleep(500); // Simulate time-consuming work
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// Running tasks in parallel using separate threads
Thread thread1 = new Thread(task1);
Thread thread2 = new Thread(task2);
thread1.start();
thread2.start();
}
}
Critical Section
- Concept:
- Atomic: Operations that appear indivisible; no other operations can see an intermediate state.
- Synchronized Block: A block of code that is synchronized on a given object, ensuring that only one thread can execute it at a time.
public class CriticalSectionExample {
private int count = 0;
// Synchronized method to increment the count
public synchronized void increment() {
count++; // Only one thread can execute this at a time
}
public int getCount() {
return count;
}
public static void main(String[] args) throws InterruptedException {
CriticalSectionExample example = new CriticalSectionExample();
Thread t1 = new Thread(example::increment);
Thread t2 = new Thread(example::increment);
t1.start();
t2.start();
t1.join();
t2.join();
// Print final count (expected to be 2 if no race condition occurs)
System.out.println("Final count: " + example.getCount());
}
}
Deadlock
- Concept: A situation where two or more threads are blocked forever, each waiting on the other to release a resource.
public class DeadlockExample {
private final Object lock1 = new Object(); // Lock 1
private final Object lock2 = new Object(); // Lock 2
// Method that acquires lock1 and then lock2
public void method1() {
synchronized (lock1) {
System.out.println("Method1 acquired lock1");
synchronized (lock2) {
System.out.println("Method1 acquired lock2");
}
}
}
// Method that acquires lock2 and then lock1
public void method2() {
synchronized (lock2) {
System.out.println("Method2 acquired lock2");
synchronized (lock1) {
System.out.println("Method2 acquired lock1");
}
}
}
public static void main(String[] args) {
DeadlockExample example = new DeadlockExample();
// Thread 1 attempts to lock lock1 then lock2
new Thread(example::method1).start();
// Thread 2 attempts to lock lock2 then lock1
new Thread(example::method2).start();
}
}
Threadpool
- Concept: A pool of pre-instantiated reusable threads. A thread pool is used to manage a set of worker threads and to manage the execution of tasks.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExample {
public static void main(String[] args) {
// Create a thread pool with 3 threads
ExecutorService executor = Executors.newFixedThreadPool(3);
// Define tasks to be executed by the thread pool
Runnable task1 = () -> System.out.println("Task 1 executed");
Runnable task2 = () -> System.out.println("Task 2 executed");
Runnable task3 = () -> System.out.println("Task 3 executed");
// Submit tasks to the thread pool
executor.execute(task1);
executor.execute(task2);
executor.execute(task3);
// Shutdown the executor service gracefully
executor.shutdown();
}
}
Java Lock
- Concept: Provides more sophisticated thread synchronization than `
synchronized` blocks. It offers features like tryLock() and lockInterruptibly().
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockExample {
private final Lock lock = new ReentrantLock(); // Lock instance
private int count = 0;
// Method to increment the count with explicit locking
public void increment() {
lock.lock(); // Acquire the lock
try {
count++; // Critical section
} finally {
lock.unlock(); // Ensure the lock is released
}
}
public int getCount() {
return count;
}
public static void main(String[] args) throws InterruptedException {
LockExample example = new LockExample();
Thread t1 = new Thread(example::increment);
Thread t2 = new Thread(example::increment);
t1.start();
t2.start();
t1.join();
t2.join();
// Print final count
System.out.println("Final count: " + example.getCount());
}
}
Executor Service
- Concept: A framework for managing a pool of threads, handling the scheduling and execution of asynchronous tasks.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExecutorServiceExample {
public static void main(String[] args) {
// Create an ExecutorService with a fixed thread pool of 2 threads
ExecutorService executor = Executors.newFixedThreadPool(2);
// Define tasks to be executed
Runnable task1 = () -> System.out.println("Task 1 executed");
Runnable task2 = () -> System.out.println("Task 2 executed");
// Submit tasks to the executor
executor.submit(task1);
executor.submit(task2);
// Shutdown the executor service
executor.shutdown();
}
}
Blocking Queue
- Concept: A thread-safe queue that supports operations that wait for the queue to become non-empty when retrieving and wait for space to become available in the queue when storing.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) {
// Create a blocking queue with a capacity of 10
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// Producer thread that adds elements to the queue
Thread producer = new Thread(() -> {
try {
queue.put("Element"); // Add an element to the queue
System.out.println("Element added");
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Handle interrupt
}
});
// Consumer thread that takes elements from the queue
Thread consumer = new Thread(() -> {
try {
String element = queue.take(); // Take an element from the queue
System.out.println("Element removed: " + element);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Handle interrupt
}
});
producer.start();
consumer.start();
}
}
Consumer Pattern
- Concept: A design pattern where a producer creates data and a consumer processes it, typically using a queue to manage the data flow.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerExample {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(5); // Create a blocking queue
// Producer thread to produce data
new Thread(() -> {
try {
String[] data = {"one", "two", "three", "four", "five"};
for (String element : data) {
queue.put(element); // Put elements into the queue
System.out.println("Produced: " + element);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// Consumer thread to consume data
new Thread(() -> {
try {
while (true) {
String element = queue.take(); // Take elements from the queue
System.out.println("Consumed: " + element);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
Thread Congestion
- Concept: Occurs when too many threads are competing for limited resources, leading to performance degradation.
public class ThreadCongestionExample {
public static void main(String[] args) {
// Task to be run by many threads
Runnable task = () -> {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + " is running");
try {
Thread.sleep(100); // Simulate some work
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Handle interrupt
}
}
};
// Creating too many threads can lead to congestion
for (int i = 0; i < 100; i++) {
new Thread(task).start();
}
}
}
Thread Signal
- Concept: Mechanisms such as
wait()
,notify()
, andnotifyAll()
are used for thread communication, allowing threads to signal each other for synchronization purposes.
public class ThreadSignalExample {
private static final Object lock = new Object(); // Lock object for synchronization
public static void main(String[] args) {
// Thread that waits for a signal
Thread waitingThread = new Thread(() -> {
synchronized (lock) {
try {
System.out.println("Waiting thread waiting...");
lock.wait(); // Wait for a signal
System.out.println("Waiting thread resumed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Handle interrupt
}
}
});
// Thread that sends a signal
Thread notifyingThread = new Thread(() -> {
synchronized (lock) {
System.out.println("Notifying thread sending signal...");
lock.notify(); // Send a signal
}
});
waitingThread.start();
notifyingThread.start();
}
}
Real-time use cases
Implement scheduled thread pool executor
- Thread pool is a collection of pre-created reusable threads that can be used to execute tasks. A thread pool reuses existing threads to execute multiple tasks.
- The executor is a high-level interface for managing and controlling thread execution.
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledThreadPoolExecutorExample{
public static void main(String[] args){
ScheduledThreadPoolExecutorService scheduler = Executor.newScheduledThreadPool(2);
// Schedule a task to run after 5 sec delay
scheduler.schedule(() -> {
System.out.println("Task executed after 5 sec delay" + Thread.currentThread().getName());
}, 5, TimeUnit.SECONDS);
// Await termination of scheduler
try {
if (!scheduler.awaitTermination(25, TimeUnits.SECONDS)) {
scheduler.shutDownNow();
} catch (InterruptedException e){
scheduler.shutdownNow();
}
}
}
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
public class CustomScheduledThreadPoolExecutor {
// Task queue for immediate execution tasks
private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
// Priority queue for scheduled tasks, sorted by execution time
private final PriorityBlockingQueue<ScheduledTask> scheduledTaskQueue = new PriorityBlockingQueue<>();
// Array of worker threads
private final Thread[] workers;
// Constructor to initialize the thread pool with a given size
public CustomScheduledThreadPoolExecutor(int poolSize) {
workers = new Thread[poolSize];
// Start worker threads
for (int i = 0; i < poolSize; i++) {
workers[i] = new Worker();
workers[i].start();
}
// Start the scheduler thread
new Scheduler().start();
}
// Method to schedule a one-time task after a delay
public void schedule(Runnable task, long delayMillis) {
scheduledTaskQueue.offer(new ScheduledTask(task, System.currentTimeMillis() + delayMillis));
}
// Method to schedule a repeating task with a fixed rate
public void scheduleAtFixedRate(Runnable task, long initialDelayMillis, long periodMillis) {
scheduledTaskQueue.offer(new ScheduledTask(task, System.currentTimeMillis() + initialDelayMillis, periodMillis));
}
// Method to shut down the executor
public void shutdown() {
for (Thread worker : workers) {
worker.interrupt(); // Interrupt each worker thread
}
}
// Worker thread class responsible for executing tasks from the task queue
private class Worker extends Thread {
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
// Take a task from the task queue and execute it
Runnable task = taskQueue.take();
task.run();
}
} catch (InterruptedException e) {
// Thread interrupted, exit gracefully
}
}
}
// Scheduler thread class responsible for moving tasks from the scheduled queue to the task queue
private class Scheduler extends Thread {
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
// Take the next scheduled task
ScheduledTask scheduledTask = scheduledTaskQueue.take();
long delay = scheduledTask.getExecutionTime() - System.currentTimeMillis();
if (delay > 0) {
// If the task is not ready to be executed, sleep for the remaining delay
Thread.sleep(delay);
}
// Move the task to the task queue for execution
taskQueue.offer(scheduledTask.getTask());
if (scheduledTask.isRepeating()) {
// If the task is repeating, schedule it again
scheduledTask.setExecutionTime(System.currentTimeMillis() + scheduledTask.getPeriod());
scheduledTaskQueue.offer(scheduledTask);
}
}
} catch (InterruptedException e) {
// Thread interrupted, exit gracefully
}
}
}
// Class representing a scheduled task with a comparable interface for priority queue ordering
private static class ScheduledTask implements Comparable<ScheduledTask> {
private final Runnable task;
private long executionTime;
private final long period;
// Constructor for one-time tasks
public ScheduledTask(Runnable task, long executionTime) {
this(task, executionTime, -1);
}
// Constructor for repeating tasks
public ScheduledTask(Runnable task, long executionTime, long period) {
this.task = task;
this.executionTime = executionTime;
this.period = period;
}
public Runnable getTask() {
return task;
}
public long getExecutionTime() {
return executionTime;
}
public void setExecutionTime(long executionTime) {
this.executionTime = executionTime;
}
public boolean isRepeating() {
return period > 0;
}
public long getPeriod() {
return period;
}
@Override
public int compareTo(ScheduledTask other) {
// Compare tasks based on their scheduled execution time
return Long.compare(this.executionTime, other.executionTime);
}
}
public static void main(String[] args) {
// Create a custom scheduled thread pool executor with 2 worker threads
CustomScheduledThreadPoolExecutor executor = new CustomScheduledThreadPoolExecutor(2);
// Schedule a one-time task to run after 5 seconds
executor.schedule(() -> System.out.println("Task executed after 5 seconds delay"), 5000);
// Schedule a periodic task to run every 3 seconds
executor.scheduleAtFixedRate(() -> System.out.println("Periodic task executed"), 0, 3000);
// Schedule a task to shut down the executor after 20 seconds
executor.schedule(() -> {
System.out.println("Shutting down executor");
executor.shutdown();
}, 20000);
}
}
Thread-safe LRU Cache
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadSafeLRUCache<K, V> {
private final int capacity;
private final Map<K, Node<K, V>> map;
private final DoublyLinkedList<K, V> list;
private final Lock lock = new ReentrantLock();
// Constructor to initialize the LRU Cache with a given capacity
public ThreadSafeLRUCache(int capacity) {
this.capacity = capacity;
this.map = new HashMap<>();
this.list = new DoublyLinkedList<>();
}
// Method to get a value from the cache
public V get(K key) {
lock.lock();
try {
if (!map.containsKey(key)) {
return null; // Return null if the key is not present
}
Node<K, V> node = map.get(key);
list.moveToHead(node); // Move the accessed node to the head (most recently used)
return node.value;
} finally {
lock.unlock(); // Ensure the lock is released
}
}
// Method to put a key-value pair into the cache
public void put(K key, V value) {
lock.lock();
try {
if (map.containsKey(key)) {
Node<K, V> node = map.get(key);
node.value = value;
list.moveToHead(node); // Move the updated node to the head
} else {
if (map.size() == capacity) {
Node<K, V> tail = list.removeTail(); // Remove the least recently used item
map.remove(tail.key);
}
Node<K, V> newNode = new Node<>(key, value);
list.addToHead(newNode); // Add the new node to the head
map.put(key, newNode);
}
} finally {
lock.unlock(); // Ensure the lock is released
}
}
// Node class representing each entry in the doubly linked list
private static class Node<K, V> {
K key;
V value;
Node<K, V> prev;
Node<K, V> next;
Node(K key, V value) {
this.key = key;
this.value = value;
}
}
// Doubly linked list to maintain the order of usage
private static class DoublyLinkedList<K, V> {
private Node<K, V> head;
private Node<K, V> tail;
DoublyLinkedList() {
head = new Node<>(null, null);
tail = new Node<>(null, null);
head.next = tail;
tail.prev = head;
}
// Add a node to the head (most recently used)
void addToHead(Node<K, V> node) {
node.next = head.next;
node.prev = head;
head.next.prev = node;
head.next = node;
}
// Remove a node from the list
void removeNode(Node<K, V> node) {
node.prev.next = node.next;
node.next.prev = node.prev;
}
// Move a node to the head
void moveToHead(Node<K, V> node) {
removeNode(node);
addToHead(node);
}
// Remove the tail node (least recently used)
Node<K, V> removeTail() {
Node<K, V> res = tail.prev;
removeNode(res);
return res;
}
}
// Main method for testing the LRU Cache
public static void main(String[] args) {
ThreadSafeLRUCache<Integer, String> cache = new ThreadSafeLRUCache<>(2);
cache.put(1, "one");
cache.put(2, "two");
System.out.println(cache.get(1)); // prints "one"
cache.put(3, "three"); // evicts key 2
System.out.println(cache.get(2)); // prints "null"
cache.put(4, "four"); // evicts key 1
System.out.println(cache.get(1)); // prints "null"
System.out.println(cache.get(3)); // prints "three"
System.out.println(cache.get(4)); // prints "four"
}
}
Implement custom Executor class
- Details of the class
- Submit a task and specify an ETA, and run the task at the ETA
- Cancel the task (if the task has not yet started.)
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.Map;
import java.time.Duration;
import java.time.Instant;
public class CustomExecutor {
// ScheduledExecutorService to handle the scheduling of tasks
private final ScheduledExecutorService scheduler;
// Map to store tasks with their IDs for easy cancellation
private final Map<Integer, ScheduledFuture<?>> tasks;
// Counter to generate unique task IDs
private int taskCounter;
// Lock object to ensure thread-safe operations on task map and counter
private final Object lock = new Object();
// Constructor to initialize the scheduler and task map
public CustomExecutor() {
this.scheduler = Executors.newScheduledThreadPool(1);
this.tasks = new HashMap<>();
this.taskCounter = 0;
}
/**
* Submit a task to be executed at a specified ETA.
* @param task The task to be executed.
* @param eta The estimated time of arrival (when the task should run).
* @return The unique ID of the scheduled task.
*/
public int submit(Runnable task, Instant eta) {
synchronized (lock) {
// Increment the task counter for unique task ID
taskCounter++;
int currentTaskId = taskCounter;
// Calculate the delay in milliseconds from now until ETA
long delay = Duration.between(Instant.now(), eta).toMillis();
// If ETA is in the past, execute the task immediately
if (delay < 0) {
delay = 0;
}
// Schedule the task with the calculated delay
ScheduledFuture<?> scheduledTask = scheduler.schedule(task, delay, TimeUnit.MILLISECONDS);
// Store the task in the map with its unique ID
tasks.put(currentTaskId, scheduledTask);
return currentTaskId;
}
}
/**
* Cancel a scheduled task before it starts.
* @param taskId The unique ID of the task to cancel.
* @return True if the task was successfully cancelled, false otherwise.
*/
public boolean cancel(int taskId) {
synchronized (lock) {
// Retrieve the scheduled task from the map
ScheduledFuture<?> scheduledTask = tasks.remove(taskId);
// If the task is found, attempt to cancel it
if (scheduledTask != null) {
return scheduledTask.cancel(false);
}
return false;
}
}
/**
* Shut down the executor, ensuring all tasks are properly handled.
*/
public void shutdown() {
scheduler.shutdown();
try {
// Wait for 1 second for existing tasks to complete
if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
// Force shutdown if tasks did not complete in time
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
// Handle interruption and force shutdown
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
// Main method for demonstration purposes
public static void main(String[] args) throws InterruptedException {
// Create an instance of CustomExecutor
CustomExecutor executor = new CustomExecutor();
// Define a simple task to be executed
Runnable task = () -> System.out.println("Task executed!");
// Set ETA to 10 seconds from now
Instant eta = Instant.now().plusSeconds(10);
// Submit the task and get the task ID
int taskId = executor.submit(task, eta);
System.out.println("Task scheduled with ID: " + taskId);
// Wait for 5 seconds before attempting to cancel the task
Thread.sleep(5000);
// Attempt to cancel the task
if (executor.cancel(taskId)) {
System.out.println("Task " + taskId + " cancelled.");
} else {
System.out.println("Failed to cancel task " + taskId + ".");
}
// Shut down the executor
executor.shutdown();
}
}
Sure, I’ll provide more detailed explanations and concise code examples for each topic.
1. Thread Synchronization with wait
/notify
/notifyAll
Explanation:
In Java, wait
, notify
, and notifyAll
are methods of the Object
class that are used for thread synchronization. These methods help coordinate the actions of multiple threads that share a common resource.
wait()
: This method causes the current thread to wait until another thread callsnotify()
ornotifyAll()
on the same object. The thread releases ownership of the monitor and waits.notify()
: This method wakes up a single thread that is waiting on the object’s monitor. If multiple threads are waiting, one of them is chosen to be awakened.notifyAll()
: This method wakes up all threads that are waiting on the object’s monitor.
These methods must be called within a synchronized context, meaning the thread must hold the monitor lock of the object on which the method is being called.
Code Example:
class SharedResource {
private int value;
private boolean available = false;
// Synchronized method for producing values
public synchronized void produce(int newValue) throws InterruptedException {
while (available) {
wait(); // Release the lock and wait
}
value = newValue;
available = true;
notifyAll(); // Notify all waiting threads
}
// Synchronized method for consuming values
public synchronized int consume() throws InterruptedException {
while (!available) {
wait(); // Release the lock and wait
}
available = false;
notifyAll(); // Notify all waiting threads
return value;
}
}
public class WaitNotifyExample {
public static void main(String[] args) {
SharedResource resource = new SharedResource();
// Producer thread
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
resource.produce(i);
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer thread
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
int value = resource.consume();
System.out.println("Consumed: " + value);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
2. Spinlock vs. Mutex
Explanation:
- Spinlock: A spinlock is a lock where the thread attempting to acquire the lock repeatedly checks if the lock is available. This “spinning” continues until the lock becomes available.
- Advantages: Spinlocks can be more efficient than traditional locks if the wait time is expected to be very short because they avoid the overhead associated with putting a thread to sleep and waking it up.
- Disadvantages: Spinlocks can waste CPU cycles because they keep the CPU busy while waiting for the lock.
- Mutex (Mutual Exclusion): A mutex is a synchronization primitive that is used to prevent multiple threads from simultaneously executing critical sections of code which can lead to race conditions.
- Advantages: Mutexes do not consume CPU resources while waiting because the waiting threads are put to sleep.
- Disadvantages: Mutexes can incur context switching overhead, which can be expensive in terms of performance if the lock contention is high.
Spinlock Code Example:
import java.util.concurrent.atomic.AtomicBoolean;
public class SpinLock {
private final AtomicBoolean lock = new AtomicBoolean(false);
public void lock() {
// Spin-wait (busy-wait) until the lock is acquired
while (!lock.compareAndSet(false, true)) {
// Do nothing, just keep trying
}
}
public void unlock() {
lock.set(false);
}
public static void main(String[] args) {
SpinLock spinLock = new SpinLock();
// Example usage of SpinLock
Runnable task = () -> {
spinLock.lock();
try {
// critical section
System.out.println(Thread.currentThread().getName() + " acquired the lock");
} finally {
spinLock.unlock();
System.out.println(Thread.currentThread().getName() + " released the lock");
}
};
Thread t1 = new Thread(task, "Thread 1");
Thread t2 = new Thread(task, "Thread 2");
t1.start();
t2.start();
}
}
Mutex Code Example (using ReentrantLock):
import java.util.concurrent.locks.ReentrantLock;
public class MutexExample {
private final ReentrantLock lock = new ReentrantLock();
public void doWork() {
lock.lock(); // Acquire the lock
try {
// Critical section
System.out.println(Thread.currentThread().getName() + " acquired the lock");
} finally {
lock.unlock(); // Always release the lock in a finally block
System.out.println(Thread.currentThread().getName() + " released the lock");
}
}
public static void main(String[] args) {
MutexExample example = new MutexExample();
// Example usage of ReentrantLock
Runnable task = example::doWork;
Thread t1 = new Thread(task, "Thread 1");
Thread t2 = new Thread(task, "Thread 2");
t1.start();
t2.start();
}
}
3. Hybrid Mutex
Explanation:
A hybrid mutex combines the properties of spinlocks and traditional mutexes. It spins for a short period and then falls back to blocking if the lock is not acquired quickly. This approach can reduce the overhead of context switching while avoiding the CPU consumption of long spin-waits.
Code Example:
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
public class HybridMutex {
private final AtomicBoolean spinLock = new AtomicBoolean(false);
private final ReentrantLock lock = new ReentrantLock();
public void lock() {
// Try spin-lock first
for (int i = 0; i < 100; i++) {
if (spinLock.compareAndSet(false, true)) {
return;
}
}
// Fall back to blocking lock
lock.lock();
}
public void unlock() {
if (spinLock.get()) {
spinLock.set(false);
} else {
lock.unlock();
}
}
public static void main(String[] args) {
HybridMutex hybridLock = new HybridMutex();
// Example usage of HybridMutex
Runnable task = () -> {
hybridLock.lock();
try {
// critical section
System.out.println(Thread.currentThread().getName() + " acquired the lock");
} finally {
hybridLock.unlock();
System.out.println(Thread.currentThread().getName() + " released the lock");
}
};
Thread t1 = new Thread(task, "Thread 1");
Thread t2 = new Thread(task, "Thread 2");
t1.start();
t2.start();
}
}
4. ReentrantLock & Condition
Explanation:
- ReentrantLock: A reentrant mutual exclusion lock with the same basic behavior and semantics as the implicit monitor lock accessed using
synchronized
methods and statements, but with extended capabilities. - Condition: Provides a means for one thread to suspend execution until notified by another thread. Conditions must be associated with a lock and can be used for more complex thread synchronization than the traditional
wait
/notify
mechanism.
Code Example:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class SharedResource {
private int value;
private boolean available = false;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public void produce(int newValue) throws InterruptedException {
lock.lock();
try {
while (available) {
condition.await(); // Wait until the resource is available
}
value = newValue;
available = true;
condition.signalAll(); // Notify all waiting threads
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (!available) {
condition.await(); // Wait until the resource is not available
}
available = false;
condition.signalAll(); // Notify all waiting threads
return value;
} finally {
lock.unlock();
}
}
}
public class ConditionExample {
public static void main(String[] args) {
SharedResource resource = new SharedResource();
// Producer thread
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
resource.produce(i);
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer thread
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
int value = resource.consume();
System.out.println("Consumed: " + value);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
5. Classic Producer
/Consumer Bounded Buffer Queue
Explanation:
A BlockingQueue
is used to manage the producer-consumer relationship with automatic synchronization. ArrayBlockingQueue
is a bounded blocking queue backed by an array. This ensures thread-safe operations and handles synchronization internally.
Code Example:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
queue.put(i); // Add item to the queue
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
int value = queue.take(); // Remove item from the queue
System.out.println("Consumed: " + value);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class BlockingQueueExample {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
6. Two-Lock Queue
Explanation:
A two-lock queue uses separate locks for enqueue and dequeue operations to reduce contention and improve concurrency. This can help in scenarios where both operations are frequently used.
Code Example:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
class TwoLockQueue<E> {
private Node<E> head;
private Node<E> tail;
private final Lock enqLock = new ReentrantLock();
private final Lock deqLock = new ReentrantLock();
private final Condition notEmpty = deqLock.newCondition();
private static class Node<E> {
E item;
Node<E> next;
Node(E item) {
this.item = item;
}
}
public void enqueue(E item) {
enqLock.lock();
try {
Node<E> newNode = new Node<>(item);
if (tail == null) {
head = tail = newNode;
} else {
tail.next = newNode;
tail = newNode;
}
} finally {
enqLock.unlock();
}
deqLock.lock();
try {
notEmpty.signal(); // Signal that the queue is not empty
} finally {
deqLock.unlock();
}
}
public E dequeue() throws InterruptedException {
deqLock.lock();
try {
while (head == null) {
notEmpty.await(); // Wait until the queue is not empty
}
Node<E> node = head;
head = head.next;
if (head == null) {
tail = null;
}
return node.item;
} finally {
deqLock.unlock();
}
}
}
7. Non-blocking Queue Implementation
Explanation:
A non-blocking queue uses atomic operations to ensure thread safety without locks. Java provides ConcurrentLinkedQueue
for this purpose. This queue is thread-safe and uses a non-blocking algorithm to achieve high concurrency.
Code Example:
import java.util.concurrent.ConcurrentLinkedQueue;
public class NonBlockingQueueExample {
public static void main(String[] args) {
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
// Producer thread
new Thread(() -> {
for (int i = 0; i < 10; i++) {
queue.add(i); // Add item to the queue
System.out.println("Produced: " + i);
}
}).start();
// Consumer thread
new Thread(() -> {
for (int i = 0; i < 10; i++) {
Integer value;
while ((value = queue.poll()) == null) {
// Busy-wait
}
System.out.println("Consumed: " + value);
}
}).start();
}
}
8. Semaphore
Explanation:
A semaphore controls access to a shared resource by maintaining a set of permits. Threads acquire permits before accessing the resource and release them afterward. If no permits are available, the acquiring thread blocks until a permit is released.
Code Example:
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
private final Semaphore semaphore = new Semaphore(1); // Binary semaphore (mutex)
public void accessResource() throws InterruptedException {
semaphore.acquire(); // Acquire a permit
try {
// critical section
System.out.println("Resource accessed by " + Thread.currentThread().getName());
} finally {
semaphore.release(); // Release the permit
}
}
public static void main(String[] args) {
SemaphoreExample example = new SemaphoreExample();
Runnable task = () -> {
try {
example.accessResource();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
new Thread(task).start();
new Thread(task).start();
}
}
9. HashMap Implementation
Explanation:
To make a HashMap
thread-safe using separate chaining, we can use ConcurrentHashMap
. This class uses fine-grained locking and non-blocking algorithms to achieve high concurrency. It divides the map into segments, each of which can be locked independently, allowing multiple threads to access different segments concurrently.
Code Example:
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
public class ConcurrentHashMapExample {
public static void main(String[] args) {
Map<String, Integer> map = new ConcurrentHashMap<>();
// Add key-value pairs to the map
map.put("one", 1);
map.put("two", 2);
// Retrieve value by key
System.out.println("Value for key 'one': " + map.get("one"));
// Remove a key-value pair
map.remove("two");
// Check if a key exists
System.out.println("Contains key 'two': " + map.containsKey("two"));
}
}
- These detailed explanations and code examples should provide a clearer understanding of each concurrency topic and how to implement them in Java.
Bounded Buffer Queue with Condition
and ReentrantLock
Explanation:
A bounded buffer queue is a data structure that allows multiple producers to add items to the queue and multiple consumers to remove items from the queue. The queue has a fixed size, and it blocks producers when the queue is full and consumers when the queue is empty.
To implement this using Condition
and ReentrantLock
, we will:
- Use a
ReentrantLock
to ensure mutual exclusion. - Use two
Condition
instances to manage the producer and consumer waiting conditions. - Use an array to store the buffer elements and manage the buffer indices.
Code Example:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBufferQueue<T> {
private final T[] buffer;
private int head, tail, count;
private final Lock lock;
private final Condition notFull, notEmpty;
@SuppressWarnings("unchecked")
public BoundedBufferQueue(int size) {
buffer = (T[]) new Object[size];
head = tail = count = 0;
lock = new ReentrantLock();
notFull = lock.newCondition();
notEmpty = lock.newCondition();
}
// Adds an item to the buffer, blocks if the buffer is full
public void put(T item) throws InterruptedException {
lock.lock();
try {
while (count == buffer.length) {
notFull.await(); // Wait until the buffer is not full
}
buffer[tail] = item;
tail = (tail + 1) % buffer.length;
count++;
notEmpty.signal(); // Signal that the buffer is not empty
} finally {
lock.unlock();
}
}
// Removes and returns an item from the buffer, blocks if the buffer is empty
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await(); // Wait until the buffer is not empty
}
T item = buffer[head];
head = (head + 1) % buffer.length;
count--;
notFull.signal(); // Signal that the buffer is not full
return item;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
BoundedBufferQueue<Integer> buffer = new BoundedBufferQueue<>(5);
// Producer thread
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
buffer.put(i);
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer thread
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
int value = buffer.take();
System.out.println("Consumed: " + value);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
Detailed Explanation:
- Initialization:
- The buffer is initialized with a fixed size.
head
andtail
pointers manage the front and rear of the queue.count
keeps track of the number of elements in the buffer.lock
is aReentrantLock
to ensure mutual exclusion.notFull
andnotEmpty
areCondition
instances used to signal when the buffer is not full and not empty, respectively.
- put(T item):
- Acquires the lock.
- Checks if the buffer is full (
count == buffer.length
). If full, it waits (notFull.await()
). - Adds the item to the buffer at the
tail
index. - Updates the
tail
index and increments thecount
. - Signals that the buffer is not empty (
notEmpty.signal()
). - Releases the lock.
- take():
- Acquires the lock.
- Checks if the buffer is empty (
count == 0
). If empty, it waits (notEmpty.await()
). - Retrieves the item from the buffer at the
head
index. - Updates the
head
index and decrements thecount
. - Signals that the buffer is not full (
notFull.signal()
). - Releases the lock.
- Main Method:
- Creates a bounded buffer with a capacity of 5.
- Starts a producer thread that adds 10 items to the buffer.
- Starts a consumer thread that retrieves 10 items from the buffer.
- This implementation ensures that producers block when the buffer is full, and consumers block when the buffer is empty, maintaining synchronization between producer and consumer threads.
Pros and Cons of Mutex/Condition Variable
Mutex (Mutual Exclusion)
Pros:
- Thread Safety: Mutexes ensure that only one thread can access the critical section at a time, preventing race conditions.
- Blocking: When a thread attempts to acquire a mutex that is already locked, it gets put to sleep, freeing up the CPU for other tasks.
- ReentrantLock Features: With
ReentrantLock
, you get more control compared tosynchronized
blocks, such as the ability to interrupt a thread waiting for a lock, try acquiring the lock without blocking, and having multipleCondition
objects for finer control over synchronization.
Cons:
- Deadlocks: If not used carefully, mutexes can lead to deadlocks, where two or more threads are waiting for each other to release locks, causing the application to hang.
- Performance Overhead: The blocking and unblocking of threads involve context switches, which can be expensive in terms of performance.
- Complexity: Properly managing the acquisition and release of locks can make the code more complex and harder to maintain.
Condition Variables
Pros:
- Fine-grained Synchronization: Condition variables allow threads to wait for specific conditions to be met, providing more fine-grained control over thread synchronization.
- Wait and Notify: Threads can wait for a condition to become true (
await
), and other threads can signal when the condition has changed (signal
orsignalAll
), allowing for more efficient communication between threads. - Multiple Conditions: With
ReentrantLock
, you can create multipleCondition
objects for different conditions, making it easier to manage complex synchronization scenarios.
Cons:
- Complexity: Using condition variables correctly can be tricky. It requires a good understanding of how conditions and locks interact, which can increase the complexity of the code.
- Potential for Spurious Wakeups: Threads waiting on a condition can be woken up without a
signal
being called (known as a spurious wakeup), requiring the use of loops to recheck the condition. - Deadlocks and Missed Signals: Incorrect use of condition variables can lead to deadlocks or missed signals, where a thread might never be woken up because the signal was sent before the thread started waiting.
Code Example
Let’s look at a simple example that demonstrates the use of ReentrantLock
and Condition
for a bounded buffer queue.
Example Code with Detailed Comments
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
// Bounded buffer queue implementation
public class BoundedBufferQueue<T> {
private final T[] buffer;
private int head, tail, count;
private final Lock lock;
private final Condition notFull, notEmpty;
@SuppressWarnings("unchecked")
public BoundedBufferQueue(int size) {
buffer = (T[]) new Object[size];
head = tail = count = 0;
lock = new ReentrantLock();
notFull = lock.newCondition();
notEmpty = lock.newCondition();
}
// Adds an item to the buffer, blocks if the buffer is full
public void put(T item) throws InterruptedException {
lock.lock();
try {
while (count == buffer.length) {
notFull.await(); // Wait until the buffer is not full
}
buffer[tail] = item;
tail = (tail + 1) % buffer.length;
count++;
notEmpty.signal(); // Signal that the buffer is not empty
} finally {
lock.unlock();
}
}
// Removes and returns an item from the buffer, blocks if the buffer is empty
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await(); // Wait until the buffer is not empty
}
T item = buffer[head];
head = (head + 1) % buffer.length;
count--;
notFull.signal(); // Signal that the buffer is not full
return item;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
BoundedBufferQueue<Integer> buffer = new BoundedBufferQueue<>(5);
// Producer thread
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
buffer.put(i);
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer thread
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
int value = buffer.take();
System.out.println("Consumed: " + value);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
Detailed Explanation:
- Initialization:
- The buffer is initialized with a fixed size.
head
andtail
pointers manage the front and rear of the queue.count
keeps track of the number of elements in the buffer.lock
is aReentrantLock
to ensure mutual exclusion.notFull
andnotEmpty
areCondition
instances used to signal when the buffer is not full and not empty, respectively.
- put(T item):
- Acquires the lock using
lock.lock()
. - Checks if the buffer is full (
count == buffer.length
). If full, it waits (notFull.await()
). - Adds the item to the buffer at the
tail
index. - Updates the
tail
index and increments thecount
. - Signals that the buffer is not empty (
notEmpty.signal()
). - Releases the lock using
lock.unlock()
.
- Acquires the lock using
- take():
- Acquires the lock using
lock.lock()
. - Checks if the buffer is empty (
count == 0
). If empty, it waits (notEmpty.await()
). - Retrieves the item from the buffer at the
head
index. - Updates the
head
index and decrements thecount
. - Signals that the buffer is not full (
notFull.signal()
). - Releases the lock using
lock.unlock()
.
- Acquires the lock using
- Main Method:
- Creates a bounded buffer with a capacity of 5.
- Starts a producer thread that adds 10 items to the buffer.
- Starts a consumer thread that retrieves 10 items from the buffer.
- This implementation showcases how to use
ReentrantLock
andCondition
to manage a bounded buffer queue, ensuring that producers and consumers are properly synchronized.
It looks like the question is about implementing concurrent methods for a class that handles sending and receiving requests, similar to a bounded buffer or queue system. The goal might be to ensure that these operations are thread-safe and manage synchronization between producing (sending) and consuming (receiving) requests.
Possible Concurrency Question
Question:
Implement a backend class that handles sending and receiving requests using a bounded buffer. Complete the sendRequest
and receiveRequest
methods to achieve the following requirements:
- The
sendRequest
method should add a request to the buffer if there is space available, or block if the buffer is full. - The
receiveRequest
method should remove a request from the buffer if available, or block if the buffer is empty. - Ensure that the operations are thread-safe and handle synchronization between threads properly.
Implementation Steps
- Define the Class Structure: Create a class with a buffer to hold the requests, and synchronization primitives to manage concurrent access.
- Buffer Initialization: Initialize the buffer with a fixed size.
- Synchronization Mechanisms: Use
ReentrantLock
for mutual exclusion andCondition
objects to manage the buffer’s full and empty states. - Implement
sendRequest
Method: Ensure it adds requests to the buffer and handles blocking when the buffer is full. - Implement
receiveRequest
Method: Ensure it removes requests from the buffer and handles blocking when the buffer is empty.
Example Code
Here’s how you might implement the class:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class RequestBuffer<T> {
private final T[] buffer;
private int head, tail, count;
private final Lock lock;
private final Condition notFull, notEmpty;
@SuppressWarnings("unchecked")
public RequestBuffer(int size) {
buffer = (T[]) new Object[size];
head = tail = count = 0;
lock = new ReentrantLock();
notFull = lock.newCondition();
notEmpty = lock.newCondition();
}
// Method to send (produce) a request
public void sendRequest(T request) throws InterruptedException {
lock.lock();
try {
while (count == buffer.length) {
notFull.await(); // Wait until the buffer is not full
}
buffer[tail] = request;
tail = (tail + 1) % buffer.length;
count++;
notEmpty.signal(); // Signal that the buffer is not empty
} finally {
lock.unlock();
}
}
// Method to receive (consume) a request
public T receiveRequest() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await(); // Wait until the buffer is not empty
}
T request = buffer[head];
head = (head + 1) % buffer.length;
count--;
notFull.signal(); // Signal that the buffer is not full
return request;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
RequestBuffer<String> requestBuffer = new RequestBuffer<>(5);
// Producer thread
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
requestBuffer.sendRequest("Request " + i);
System.out.println("Sent: Request " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer thread
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
String request = requestBuffer.receiveRequest();
System.out.println("Received: " + request);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
Detailed Explanation:
- Class Initialization:
- The buffer is initialized to a fixed size.
head
andtail
pointers manage the start and end of the queue.count
tracks the number of elements in the buffer.lock
ensures mutual exclusion.notFull
andnotEmpty
are conditions used to signal the state of the buffer.
- sendRequest(T request):
- Acquires the lock.
- Checks if the buffer is full. If it is, the method waits (
notFull.await()
). - Adds the request to the buffer.
- Updates the
tail
index and incrementscount
. - Signals that the buffer is not empty (
notEmpty.signal()
). - Releases the lock.
- receiveRequest():
- Acquires the lock.
- Checks if the buffer is empty. If it is, the method waits (
notEmpty.await()
). - Retrieves the request from the buffer.
- Updates the
head
index and decrementscount
. - Signals that the buffer is not full (
notFull.signal()
). - Releases the lock.
This implementation ensures that the sendRequest
and receiveRequest
methods are thread-safe and properly synchronized, managing the state of the buffer and coordinating producer and consumer threads.
Explanation of the Question
Question: Implement a WeightedCache
class that manages items with associated weights. The cache should only keep items whose total weight does not exceed a specified maximum total weight (maxTotalWeight
). You need to complete the sendRequest
and receiveRequest
methods to handle concurrent access to the cache in a thread-safe manner.
Strategy
- Class Initialization:
- Define a fixed-size buffer (
maxTotalWeight
) for the cache. - Use
ReentrantLock
to manage mutual exclusion and ensure thread safety. - Use
Condition
objects to manage the full and empty states of the buffer.
- Define a fixed-size buffer (
- Data Structures:
- Use a
HashMap
to store the cache items. - Use a
PriorityQueue
to manage the weights of the items for efficient access.
- Use a
- Concurrency Management:
- Implement the
put
method to add items to the cache. It should wait if adding the item exceedsmaxTotalWeight
. - Implement the
get
method to retrieve items from the cache in a thread-safe manner.
- Implement the
Code with Comments
import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
// Class to represent the weighted cache
public class WeightedCache {
private final int maxTotalWeight; // Maximum allowed total weight for the cache
private int currentWeight; // Current total weight of items in the cache
private final Map<String, CacheItem> cache; // Map to store cache items
private final PriorityQueue<CacheItem> weightQueue; // Priority queue to manage item weights
private final Lock lock; // Lock for mutual exclusion
private final Condition notFull; // Condition to wait if the cache is full
// Cache item class to store key, value, and weight
private static class CacheItem {
String key;
String value;
int weight;
CacheItem(String key, String value, int weight) {
this.key = key;
this.value = value;
this.weight = weight;
}
}
// Constructor to initialize the weighted cache
public WeightedCache(int maxTotalWeight) {
this.maxTotalWeight = maxTotalWeight;
this.currentWeight = 0;
this.cache = new HashMap<>();
this.weightQueue = new PriorityQueue<>((a, b) -> Integer.compare(a.weight, b.weight));
this.lock = new ReentrantLock();
this.notFull = lock.newCondition();
}
// Method to get the value associated with a key in the cache
public String get(String key) {
lock.lock();
try {
CacheItem item = cache.get(key);
return item == null ? null : item.value;
} finally {
lock.unlock();
}
}
// Method to put a key-value pair in the cache with a specified weight
public void put(String key, String value, int weight) throws InterruptedException {
lock.lock();
try {
// Wait if adding the new item would exceed the max total weight
while (currentWeight + weight > maxTotalWeight) {
notFull.await();
}
// Remove the existing item if the key is already in the cache
if (cache.containsKey(key)) {
removeItem(key);
}
// Add the new item to the cache and priority queue
CacheItem newItem = new CacheItem(key, value, weight);
cache.put(key, newItem);
weightQueue.add(newItem);
currentWeight += weight;
// Signal other threads that they may proceed
notFull.signalAll();
} finally {
lock.unlock();
}
}
// Helper method to remove an item from the cache
private void removeItem(String key) {
CacheItem item = cache.remove(key);
if (item != null) {
weightQueue.remove(item);
currentWeight -= item.weight;
}
}
public static void main(String[] args) {
WeightedCache cache = new WeightedCache(10);
// Producer thread to add items to the cache
Thread producer = new Thread(() -> {
try {
cache.put("key1", "value1", 3);
System.out.println("Put key1: value1, weight 3");
cache.put("key2", "value2", 4);
System.out.println("Put key2: value2, weight 4");
cache.put("key3", "value3", 5);
System.out.println("Put key3: value3, weight 5");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer thread to retrieve items from the cache
Thread consumer = new Thread(() -> {
try {
Thread.sleep(1000); // Allow producer to add items first
System.out.println("Get key1: " + cache.get("key1"));
System.out.println("Get key2: " + cache.get("key2"));
System.out.println("Get key3: " + cache.get("key3"));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
Detailed Explanation of Code with Comments:
- Class Initialization:
maxTotalWeight
: Defines the maximum weight allowed for items in the cache.currentWeight
: Tracks the total weight of items currently in the cache.cache
: AHashMap
to store the key-value pairs.weightQueue
: APriorityQueue
to manage and prioritize items based on their weight.lock
: AReentrantLock
to ensure that only one thread can modify the cache at a time.notFull
: ACondition
used to signal when space becomes available in the cache.
- CacheItem Class:
- Represents an item in the cache with a key, value, and weight.
- Constructor:
- Initializes the cache, priority queue, lock, and condition.
- get(String key):
- Acquires the lock using
lock.lock()
. - Retrieves the item from the cache if it exists.
- Releases the lock using
lock.unlock()
.
- Acquires the lock using
- put(String key, String value, int weight):
- Acquires the lock using
lock.lock()
. - Waits (
notFull.await()
) if adding the new item would exceed themaxTotalWeight
. - Removes the existing item if the key is already in the cache.
- Adds the new item to the cache and priority queue.
- Updates the
currentWeight
. - Signals other waiting threads (
notFull.signalAll()
) that space may be available in the cache. - Releases the lock using
lock.unlock()
.
- Acquires the lock using
- removeItem(String key):
- Removes the item from the cache and priority queue.
- Updates the
currentWeight
.
- Main Method:
- Creates a
WeightedCache
with a capacity of 10. - Starts a producer thread that adds items to the cache.
- Starts a consumer thread that retrieves items from the cache.
- Creates a
This implementation ensures that the cache operations are thread-safe and handle concurrent access properly, making efficient use of ReentrantLock
and Condition
to manage synchronization.