001/*
002// This software is subject to the terms of the Eclipse Public License v1.0
003// Agreement, available at the following URL:
004// http://www.eclipse.org/legal/epl-v10.html.
005// You must accept the terms of that agreement to use this software.
006//
007// Copyright (C) 2011-2013 Pentaho and others
008// All Rights Reserved.
009*/
010package mondrian.rolap;
011
012import mondrian.olap.*;
013import mondrian.resource.MondrianResource;
014import mondrian.server.Execution;
015import mondrian.util.Pair;
016
017import org.eigenbase.util.property.IntegerProperty;
018
019import java.util.List;
020import java.util.Timer;
021import java.util.TimerTask;
022import java.util.concurrent.*;
023
024/**
025 * A utility class for {@link RolapConnection}. It specializes in
026 * shepherding the creation of RolapResult by running the actual execution
027 * on a separate thread from the user thread so we can:
028 * <ul>
029 * <li>Monitor all executions for timeouts and resource limits as they run
030 * in the background</li>
031 * <li>Bubble exceptions to the user thread as fast as they happen.</li>
032 * <li>Gracefully cancel all SQL statements and cleanup in the background.</li>
033 * </ul>
034 *
035 * @author LBoudreau
036 */
037public class RolapResultShepherd {
038
039    /**
040     * An executor service used for both the shepherd thread and the
041     * Execution objects.
042     */
043    private final ExecutorService executor;
044
045    /**
046     * List of tasks that should be monitored by the shepherd thread.
047     */
048    private final List<Pair<FutureTask<Result>, Execution>> tasks =
049        new CopyOnWriteArrayList<Pair<FutureTask<Result>,Execution>>();
050
051    private final Timer timer =
052        Util.newTimer("mondrian.rolap.RolapResultShepherd#timer", true);
053
054    public RolapResultShepherd() {
055        final IntegerProperty property =
056            MondrianProperties.instance().RolapConnectionShepherdNbThreads;
057        final int maximumPoolSize = property.get();
058        executor =
059            Util.getExecutorService(
060                maximumPoolSize,
061                0, 1,
062                "mondrian.rolap.RolapResultShepherd$executor",
063                new RejectedExecutionHandler() {
064                    public void rejectedExecution(
065                        Runnable r,
066                        ThreadPoolExecutor executor)
067                    {
068                        throw MondrianResource.instance().QueryLimitReached.ex(
069                            maximumPoolSize,
070                            property.getPath());
071                    }
072                });
073        final Pair<Long, TimeUnit> interval =
074            Util.parseInterval(
075                String.valueOf(
076                    MondrianProperties.instance()
077                        .RolapConnectionShepherdThreadPollingInterval.get()),
078                TimeUnit.MILLISECONDS);
079        long period = interval.right.toMillis(interval.left);
080        timer.schedule(
081            new TimerTask() {
082                public void run() {
083                    for (final Pair<FutureTask<Result>, Execution> task
084                        : tasks)
085                    {
086                        if (task.left.isDone()) {
087                            tasks.remove(task);
088                            continue;
089                        }
090                        if (task.right.isCancelOrTimeout()) {
091                            // Remove it from the list so that we know
092                            // it was cleaned once.
093                            tasks.remove(task);
094
095                            // Cancel the FutureTask for which
096                            // the user thread awaits. The user
097                            // thread will call
098                            // Execution.checkCancelOrTimeout
099                            // later and take care of sending
100                            // an exception on the user thread.
101                            task.left.cancel(false);
102                        }
103                    }
104                }
105            },
106            period,
107            period);
108    }
109
110    /**
111     * Executes and shepherds the execution of an Execution instance.
112     * The shepherd will wrap the Execution instance into a Future object
113     * which can be monitored for exceptions. If any are encountered,
114     * two things will happen. First, the user thread will be returned and
115     * the resulting exception will bubble up. Second, the execution thread
116     * will attempt to do a graceful stop of all running SQL statements and
117     * release all other resources gracefully in the background.
118     * @param execution An Execution instance.
119     * @param callable A callable to monitor returning a Result instance.
120     * @throws ResourceLimitExceededException if some resource limit specified
121     * in the property file was exceeded
122     * @throws QueryCanceledException if query was canceled during execution
123     * @throws QueryTimeoutException if query exceeded timeout specified in
124     * the property file
125     * @return A Result object, as supplied by the Callable passed as a
126     * parameter.
127     */
128    public Result shepherdExecution(
129        Execution execution,
130        Callable<Result> callable)
131    {
132        // We must wrap this execution into a task that so that we are able
133        // to monitor, cancel and detach from it.
134        FutureTask<Result> task = new FutureTask<Result>(callable);
135
136        // Register this task with the shepherd thread
137        final Pair<FutureTask<Result>, Execution> pair =
138            new Pair<FutureTask<Result>, Execution>(
139                task,
140                execution);
141        tasks.add(pair);
142
143        try {
144            // Now run it.
145            executor.execute(task);
146            return task.get();
147        } catch (Throwable e) {
148            // Make sure to clean up pending SQL queries.
149            execution.cancelSqlStatements();
150
151            // Make sure to propagate the interruption flag.
152            if (e instanceof InterruptedException) {
153                Thread.currentThread().interrupt();
154            }
155
156            // Unwrap any java.concurrent wrappers.
157            Throwable node = e;
158            if (e instanceof ExecutionException) {
159                ExecutionException executionException = (ExecutionException) e;
160                node = executionException.getCause();
161            }
162
163            // Let the Execution throw whatever it wants to, this way the
164            // API contract is respected. The program should in most cases
165            // stop here as most exceptions will originate from the Execution
166            // instance.
167            execution.checkCancelOrTimeout();
168
169            // We must also check for ResourceLimitExceededExceptions,
170            // which might be wrapped by an ExecutionException. In order to
171            // respect the API contract, we must throw the cause, not the
172            // wrapper.
173            final ResourceLimitExceededException t =
174                Util.getMatchingCause(
175                    node, ResourceLimitExceededException.class);
176            if (t != null) {
177                throw t;
178            }
179
180            // Check for Mondrian exceptions in the exception chain.
181            // we can throw these back as-is.
182            final MondrianException m =
183                Util.getMatchingCause(
184                    node, MondrianException.class);
185            if (m != null) {
186                // Throw that.
187                throw m;
188            }
189
190            // Since we got here, this means that the exception was
191            // something else. Just wrap/throw.
192            if (node instanceof RuntimeException) {
193                throw (RuntimeException) node;
194            } else if (node instanceof Error) {
195                throw (Error) node;
196            } else {
197                throw new MondrianException(node);
198            }
199        }
200    }
201
202    public void shutdown() {
203        this.timer.cancel();
204        this.executor.shutdown();
205        this.tasks.clear();
206    }
207}
208
209// End RolapResultShepherd.java
210