Java 1 producer - multiple consumer buffer deadlock

Discussion in 'Mixed Languages' started by Stannieman, Oct 28, 2013.

  1. Stannieman

    Stannieman MDL Guru

    Sep 4, 2009
    2,232
    1,818
    90
    #1 Stannieman, Oct 28, 2013
    Last edited by a moderator: Apr 20, 2017
    Hi,
    I'm writing a java app and need to have a buffer with multiple consumers and 1 producer. The producer can only write to a buffercell if it has been read by every consumer.

    I set up a test with 4 buffer places and 3 consumers + some sleep time to check things out. Sleeptimes are set so that at some point the producer tries to overtake the consumers, and thus has to wait.

    Now the problem is that once it tries to overtake a deadlocks occurs, it somehow doesn't receive the signal to try again once cells are read and thus become available.

    In this example that happens at 12. That's the last value it writes. Then the other cells are read and the buffer becomes empty, but not written to anymore though I call notifyAll();.

    Buffer
    Code:
    package stannieman.integrityChecker;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * Author: Stannieman
     * Date: 27/10/13
     * Time: 23:49
     *
     */
    public class Buffer {
    private int[] buffer;
    private int writeIdx;
    private int[] readIndexes;
    private boolean[][] occupied;
    
    public Buffer(int nBuffers, int nReaders){
    buffer = new int[nBuffers];
    writeIdx = 0;
    
    readIndexes = new int[nReaders];
    for (int i=0; i<nReaders; i++)
    readIndexes = 0;
    
    occupied = new boolean[nBuffers][nReaders];
    for (int i=0; i<nBuffers; i++)
    for (int j=0; j<nReaders; j++)
    occupied[j] = false;
    }
    
    public synchronized void write(int writeBytes) throws InterruptedException {
    boolean isCellOccupied = false;
    for (boolean occ : occupied[writeIdx])
    if (occ)
    isCellOccupied = true;
    
    while(isCellOccupied)
    wait();
    
    buffer[writeIdx] = writeBytes;
    for (int i=0; i<occupied[writeIdx].length; i++)
    occupied[writeIdx] = true;
    
    if (writeIdx < buffer.length - 1)
    writeIdx++;
    else
    writeIdx = 0;
    
    notifyAll();
    }
    
    public synchronized int read(int readerId) throws InterruptedException {
    int returnBytes;
    
    while (!occupied[readIndexes[readerId]][readerId])
    wait();
    
    returnBytes = buffer[readIndexes[readerId]];
    
    occupied[readIndexes[readerId]][readerId] = false;
    
    if (readIndexes[readerId] < buffer.length - 1)
    readIndexes[readerId]++;
    else
    readIndexes[readerId] = 0;
    
    notifyAll();
    
    return returnBytes;
    }
    }


    Filler (fills buffer)
    Code:
    package stannieman.integrityChecker;
    
    /**
     * Author: Stannieman
     * Date: 28/10/13
     * Time: 16:47
     */
    public class Filler implements Runnable{
    private Buffer buff;
    
    public Filler(Buffer buff)
    {
    this.buff = buff;
    }
    
    @Override
    public void run() {
    for (int i=0; i<30; i++)
    {
    try {
    buff.write(i);
    System.out.println("Write: " + i);
    Thread.sleep(500);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }

    Extractor (reads from buffer)
    Code:
    package stannieman.integrityChecker;
    
    /**
     * Author: Stannieman
     * Date: 28/10/13
     * Time: 16:52
     */
    public class Extractor implements Runnable {
    private int id;
    private Buffer buff;
    
    public Extractor(int id, Buffer buff){
    this.id = id;
    this.buff = buff;
    }
    
    @Override
    public void run() {
    for (int i=0; i<30; i++)
    {
    int read;
    try {
    read = buff.read(id);
    System.out.println("Read" + id + ": " + read);
    Thread.sleep(700);
    } catch (InterruptedException e) {
    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
    }
    }
    }
    }

    Main (to start stuff)
    Code:
    package stannieman.integrityChecker;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * Author: Stannieman
     * Date: 28/10/13
     * Time: 16:54
     */
    public class Main {
    public static void main(String[] args) throws InterruptedException {
    ExecutorService exServ = Executors.newCachedThreadPool();
    
    Buffer buff = new Buffer(4, 3);
    
    Filler fill = new Filler(buff);
    
    Extractor ext1 = new Extractor(0, buff);
    Extractor ext2 = new Extractor(1, buff);
    Extractor ext3 = new Extractor(2, buff);
    
    exServ.execute(fill);
    Thread.sleep(200);
    exServ.execute(ext1);
    exServ.execute(ext2);
    exServ.execute(ext3);
    
    exServ.shutdown();
    
    }
    }
    

    Thanks,
    Stan
     
    Stop hovering to collapse... Click to collapse... Hover to expand... Click to expand...
  2. Stannieman

    Stannieman MDL Guru

    Sep 4, 2009
    2,232
    1,818
    90
    #2 Stannieman, Oct 28, 2013
    Last edited: Nov 19, 2013
    (OP)
    The buffer now stores int, but should store byte arrays. So the buffer variable would than be a 2d array. I only changed it to int for testing, but the principle and the bug remain the same.
    That's why I sometimes use byte in the names of variables.

    EDIT: I also tried it manually with a reentrantlock and conditions. Same issue but happens at 13, so seems doing it manual improves performance a bit.


    EDIT: I implemented an ArrayBlockingQueue to fix it in different way (not optimal though), so it doesn't really matter anymore. Though it still would be nice to get this fixed.
     
    Stop hovering to collapse... Click to collapse... Hover to expand... Click to expand...