๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
๊ฐœ์ธ ๊ณต๋ถ€/Java, Python

[Java] BlockingQueue

by syLim___ 2023. 10. 18.
728x90

โœ… BlockingQueue

 

 - ํ๊ฐ€ ๊ฝ‰ ์ฐผ์„ ๋•Œ Object๋ฅผ ์‚ฝ์ž…ํ•˜๋ ค๊ณ  ํ•  ๋•Œ๋Š” ๊ณต๊ฐ„์ด ์ƒ๊ธธ ๋•Œ๊นŒ์ง€ waitํ•˜๊ณ ,

    ํ๊ฐ€ ๋น„์—ˆ์„ ๋•Œ Object๋ฅผ ๊บผ๋‚ด๋ ค๊ณ  ํ•  ๋•Œ๋Š” ํ์— ์ƒˆ๋กœ์šด Object๊ฐ€ ์‚ฝ์ž…๋  ๋•Œ๊นŒ์ง€ waitํ•˜๋Š” ๊ธฐ๋Šฅ์„ ๊ฐ€์ง€๊ณ  ์žˆ๋Š” ํ์ด๋‹ค.

 - Thread-safeํ•˜๋‹ค.

 - ์ฒ˜์Œ์—๋Š” Producer-Consumer Queue๋ฅผ ๊ตฌํ˜„ํ•˜๊ธฐ ์œ„ํ•ด ๋งŒ๋“ค์–ด์กŒ๋‹ค๊ณ  ํ•œ๋‹ค.

 

โœ… BlockingQueue Methods

 

 - BlockingQueue์˜ ๋ฉ”์„œ๋“œ๋“ค์€ ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ–ˆ์„ ๋•Œ ์–ด๋–ป๊ฒŒ ์ฒ˜๋ฆฌํ•˜๋А๋ƒ์— ๋”ฐ๋ผ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ๋ถ„๋ฅ˜ํ•  ์ˆ˜ ์žˆ๋‹ค.

 

  Throws Exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() - -

 

    - Throws Exception : ์˜ˆ์™ธ ๋ฐœ์ƒ์‹œ Exception์„ ๋˜์ง„๋‹ค

    - Special value : ์˜ˆ์™ธ ๋ฐœ์ƒ์‹œ ํŠน์ • ๊ฐ’(null์ด๋‚˜ false)์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

    - Blocks : ์˜ˆ์™ธ ๋ฐœ์ƒ์‹œ waitํ•œ๋‹ค.

    - Times out : ์˜ˆ์™ธ ๋ฐœ์ƒ์‹œ(์ž‘์—…์„ ๋‹น์žฅ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์—†์œผ๋ฉด) waitํ•œ๋‹ค. ์ง€์ •ํ•œ time limit ๋‚ด์— ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๊ฒŒ ๋˜๋ฉด ์ˆ˜ํ–‰ ํ›„ true๋ฅผ ๋ฆฌํ„ดํ•˜๊ณ , timeout๋˜๋ฉด false๋ฅผ ๋ฆฌํ„ดํ•œ๋‹ค.

 

 - ๊ฐ ๋ฉ”์„œ๋“œ์— ๋Œ€ํ•œ ์ž์„ธํ•œ ์„ค๋ช…์€ ๊ณต์‹ ๋ฌธ์„œ ์ฐธ๊ณ 

 

โœ… Producer-Consumer problem ํ•ด๊ฒฐ

 

์—ฌ๋Ÿฌ ๊ฐœ์˜ Producer ์Šค๋ ˆ๋“œ์™€, ์—ฌ๋Ÿฌ ๊ฐœ์˜ Consumer ์Šค๋ ˆ๋“œ๋ฅผ ์‹คํ–‰ํ•  ๋•Œ,

BlockingQueue๋ฅผ ์ด์šฉํ•˜์—ฌ ์ƒ์‚ฐ์ž-์†Œ๋น„์ž ๋ฌธ์ œ๋ฅผ ์‰ฝ๊ฒŒ ํ•ด๊ฒฐํ•  ์ˆ˜ ์žˆ๋‹ค.

 

 

Java

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

public class BlockingQueueEx {
    public static void main(String[] args) {
    
        BlockingQueue buffer = new LinkedBlockingQueue();
        
        Producer p1 = new Producer(buffer, "p1");
        Producer p2 = new Producer(buffer, "p2");
        Consumer c1 = new Consumer(buffer, "c1");
        Consumer c2 = new Consumer(buffer, "c2");
        new Thread(p1).start();
        new Thread(p2).start();
        new Thread(c1).start();
        new Thread(c2).start();
    }
}

class Producer implements Runnable{

    Logger logger = Logger.getLogger("logger");
    static AtomicInteger i = new AtomicInteger();
    private final BlockingQueue queue;
    String name;

    Producer (BlockingQueue queue, String name){
        this.queue = queue;
        this.name = name;
    }

    public int produce() throws InterruptedException {
        int data = i.incrementAndGet();
        logger.info(String.format("%s ์ด/๊ฐ€ ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฝ์ž…ํ•ฉ๋‹ˆ๋‹ค: %d", name, data));
        Thread.sleep(500);
        return data;
    }

    @Override
    public void run() {
        try{
            while(true){
                queue.put(produce());
            }
        } catch (InterruptedException ignore){

        }
    }
}

class Consumer implements Runnable {
    Logger logger = Logger.getLogger("logger");
    private final BlockingQueue queue;
    String name;

    Consumer(BlockingQueue queue, String name){
        this.queue = queue;
        this.name = name;
    }

    public void consume(Object target) throws InterruptedException {
        logger.info(String.format("%s ์ด/๊ฐ€ ๋ฐ์ดํ„ฐ๋ฅผ ์†Œ๋น„ํ•ฉ๋‹ˆ๋‹ค: %d " , name, target));
        Thread.sleep(500);
    }

    @Override
    public void run(){
        try{
            while(true){
                consume(queue.take());
            }
        } catch (InterruptedException ignore){

        }
    }
}

 

 - ์‹คํ–‰ ๊ฒฐ๊ณผ

 

 

โœ… Reference

 

 - https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/BlockingQueue.html

728x90

'๊ฐœ์ธ ๊ณต๋ถ€ > Java, Python' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€

[Java] Reflection  (0) 2023.11.17
[Java] equals(), hashCode()  (0) 2023.10.25
[Java] Java TCP socket programming  (0) 2023.09.19
[Java] ์ž๋ฐ” ๋กœ๊น…(Logging)  (0) 2023.09.06
๊ฐ์ฒด์ง€ํ–ฅ์˜ ํŠน์ง•  (0) 2023.07.24