001/*
002// This software is subject to the terms of the Eclipse Public License v1.0
003// Agreement, available at the following URL:
004// http://www.eclipse.org/legal/epl-v10.html.
005// You must accept the terms of that agreement to use this software.
006//
007// Copyright (C) 2011-2012 Pentaho
008// All Rights Reserved.
009*/
010package mondrian.util;
011
012import org.apache.log4j.Logger;
013
014import java.util.concurrent.*;
015import java.util.concurrent.locks.ReentrantReadWriteLock;
016
017/**
018 * Implementation of {@link java.util.concurrent.Future} that completes
019 * when a thread writes a value (or an exception) into a slot.
020 */
021public class SlotFuture<V> implements Future<V> {
022    private V value;
023    private Throwable throwable;
024    private boolean done;
025    private boolean cancelled;
026    private final CountDownLatch dataGate = new CountDownLatch(1);
027    private final ReentrantReadWriteLock stateLock =
028        new ReentrantReadWriteLock();
029    private static final Logger LOG = Logger.getLogger(SlotFuture.class);
030
031    private final String thisString;
032
033    /**
034     * Creates a SlotFuture.
035     */
036    public SlotFuture() {
037        thisString = super.toString();
038    }
039
040    @Override
041    public String toString() {
042        return thisString;
043    }
044
045    /**
046     * {@inheritDoc}
047     *
048     * <p>The SlotFuture does not know which thread is computing the result
049     * and therefore the {@code mayInterruptIfRunning} parameter is ignored.</p>
050     */
051    public boolean cancel(boolean mayInterruptIfRunning) {
052        stateLock.writeLock().lock();
053        try {
054            if (!done) {
055                cancelled = true;
056                done = true;
057                dataGate.countDown();
058                return true;
059            } else {
060                return false;
061            }
062        } finally {
063            stateLock.writeLock().unlock();
064        }
065    }
066
067    public boolean isCancelled() {
068        stateLock.readLock().lock();
069        try {
070            return cancelled;
071        } finally {
072            stateLock.readLock().unlock();
073        }
074    }
075
076    public boolean isDone() {
077        stateLock.readLock().lock();
078        try {
079            return done || cancelled || throwable != null;
080        } finally {
081            stateLock.readLock().unlock();
082        }
083    }
084
085    public V get() throws ExecutionException, InterruptedException {
086        // Wait for a put, fail or cancel
087        dataGate.await();
088
089        // Now a put, fail or cancel has occurred, state does not change; we
090        // don't need even a read lock.
091        if (throwable != null) {
092            throw new ExecutionException(throwable);
093        }
094        return value;
095    }
096
097    public V get(long timeout, TimeUnit unit)
098        throws ExecutionException, InterruptedException, TimeoutException
099    {
100        // Wait for a put, fail or cancel
101        if (!dataGate.await(timeout, unit)) {
102            throw new TimeoutException();
103        }
104
105        // Now a put, fail or cancel has occurred, state does not change; we
106        // don't need even a read lock.
107        if (throwable != null) {
108            throw new ExecutionException(throwable);
109        }
110        return value;
111    }
112
113    /**
114     * Writes a value into the slot, indicating that the task has completed
115     * successfully.
116     *
117     * @param value Value to yield as the result of the computation
118     *
119     * @throws IllegalArgumentException if put, fail or cancel has already
120     *     been invoked on this future
121     */
122    public void put(V value) {
123        stateLock.writeLock().lock(); // need exclusive write access to state
124        try {
125            if (done) {
126                final String message =
127                    "Future is already done (cancelled=" + cancelled
128                    + ", value=" + this.value
129                    + ", throwable=" + throwable + ")";
130                LOG.error(message);
131                throw new IllegalArgumentException(
132                    message);
133            }
134            this.value = value;
135            this.done = true;
136        } finally {
137            stateLock.writeLock().unlock();
138        }
139        dataGate.countDown();
140    }
141
142    /**
143     * Writes a throwable into the slot, indicating that the task has failed.
144     *
145     * @param throwable Exception that aborted the computation
146     *
147     * @throws IllegalArgumentException if put, fail or cancel has already
148     *     been invoked on this future
149     */
150    public void fail(Throwable throwable) {
151        stateLock.writeLock().lock(); // need exclusive write access to state
152        try {
153            if (done) {
154                throw new IllegalArgumentException(
155                    "Future is already done (cancelled=" + cancelled
156                    + ", value=" + value
157                    + ", throwable=" + this.throwable + ")");
158            }
159            this.throwable = throwable;
160            this.done = true;
161        } finally {
162            stateLock.writeLock().unlock();
163        }
164        dataGate.countDown();
165    }
166}
167
168// End SlotFuture.java