001/* 002// This software is subject to the terms of the Eclipse Public License v1.0 003// Agreement, available at the following URL: 004// http://www.eclipse.org/legal/epl-v10.html. 005// You must accept the terms of that agreement to use this software. 006// 007// Copyright (C) 2011-2013 Pentaho and others 008// All Rights Reserved. 009*/ 010package mondrian.rolap; 011 012import mondrian.olap.*; 013import mondrian.resource.MondrianResource; 014import mondrian.server.Execution; 015import mondrian.util.Pair; 016 017import org.eigenbase.util.property.IntegerProperty; 018 019import java.util.List; 020import java.util.Timer; 021import java.util.TimerTask; 022import java.util.concurrent.*; 023 024/** 025 * A utility class for {@link RolapConnection}. It specializes in 026 * shepherding the creation of RolapResult by running the actual execution 027 * on a separate thread from the user thread so we can: 028 * <ul> 029 * <li>Monitor all executions for timeouts and resource limits as they run 030 * in the background</li> 031 * <li>Bubble exceptions to the user thread as fast as they happen.</li> 032 * <li>Gracefully cancel all SQL statements and cleanup in the background.</li> 033 * </ul> 034 * 035 * @author LBoudreau 036 */ 037public class RolapResultShepherd { 038 039 /** 040 * An executor service used for both the shepherd thread and the 041 * Execution objects. 042 */ 043 private final ExecutorService executor; 044 045 /** 046 * List of tasks that should be monitored by the shepherd thread. 047 */ 048 private final List<Pair<FutureTask<Result>, Execution>> tasks = 049 new CopyOnWriteArrayList<Pair<FutureTask<Result>,Execution>>(); 050 051 private final Timer timer = 052 Util.newTimer("mondrian.rolap.RolapResultShepherd#timer", true); 053 054 public RolapResultShepherd() { 055 final IntegerProperty property = 056 MondrianProperties.instance().RolapConnectionShepherdNbThreads; 057 final int maximumPoolSize = property.get(); 058 executor = 059 Util.getExecutorService( 060 maximumPoolSize, 061 0, 1, 062 "mondrian.rolap.RolapResultShepherd$executor", 063 new RejectedExecutionHandler() { 064 public void rejectedExecution( 065 Runnable r, 066 ThreadPoolExecutor executor) 067 { 068 throw MondrianResource.instance().QueryLimitReached.ex( 069 maximumPoolSize, 070 property.getPath()); 071 } 072 }); 073 final Pair<Long, TimeUnit> interval = 074 Util.parseInterval( 075 String.valueOf( 076 MondrianProperties.instance() 077 .RolapConnectionShepherdThreadPollingInterval.get()), 078 TimeUnit.MILLISECONDS); 079 long period = interval.right.toMillis(interval.left); 080 timer.schedule( 081 new TimerTask() { 082 public void run() { 083 for (final Pair<FutureTask<Result>, Execution> task 084 : tasks) 085 { 086 if (task.left.isDone()) { 087 tasks.remove(task); 088 continue; 089 } 090 if (task.right.isCancelOrTimeout()) { 091 // Remove it from the list so that we know 092 // it was cleaned once. 093 tasks.remove(task); 094 095 // Cancel the FutureTask for which 096 // the user thread awaits. The user 097 // thread will call 098 // Execution.checkCancelOrTimeout 099 // later and take care of sending 100 // an exception on the user thread. 101 task.left.cancel(false); 102 } 103 } 104 } 105 }, 106 period, 107 period); 108 } 109 110 /** 111 * Executes and shepherds the execution of an Execution instance. 112 * The shepherd will wrap the Execution instance into a Future object 113 * which can be monitored for exceptions. If any are encountered, 114 * two things will happen. First, the user thread will be returned and 115 * the resulting exception will bubble up. Second, the execution thread 116 * will attempt to do a graceful stop of all running SQL statements and 117 * release all other resources gracefully in the background. 118 * @param execution An Execution instance. 119 * @param callable A callable to monitor returning a Result instance. 120 * @throws ResourceLimitExceededException if some resource limit specified 121 * in the property file was exceeded 122 * @throws QueryCanceledException if query was canceled during execution 123 * @throws QueryTimeoutException if query exceeded timeout specified in 124 * the property file 125 * @return A Result object, as supplied by the Callable passed as a 126 * parameter. 127 */ 128 public Result shepherdExecution( 129 Execution execution, 130 Callable<Result> callable) 131 { 132 // We must wrap this execution into a task that so that we are able 133 // to monitor, cancel and detach from it. 134 FutureTask<Result> task = new FutureTask<Result>(callable); 135 136 // Register this task with the shepherd thread 137 final Pair<FutureTask<Result>, Execution> pair = 138 new Pair<FutureTask<Result>, Execution>( 139 task, 140 execution); 141 tasks.add(pair); 142 143 try { 144 // Now run it. 145 executor.execute(task); 146 return task.get(); 147 } catch (Throwable e) { 148 // Make sure to clean up pending SQL queries. 149 execution.cancelSqlStatements(); 150 151 // Make sure to propagate the interruption flag. 152 if (e instanceof InterruptedException) { 153 Thread.currentThread().interrupt(); 154 } 155 156 // Unwrap any java.concurrent wrappers. 157 Throwable node = e; 158 if (e instanceof ExecutionException) { 159 ExecutionException executionException = (ExecutionException) e; 160 node = executionException.getCause(); 161 } 162 163 // Let the Execution throw whatever it wants to, this way the 164 // API contract is respected. The program should in most cases 165 // stop here as most exceptions will originate from the Execution 166 // instance. 167 execution.checkCancelOrTimeout(); 168 169 // We must also check for ResourceLimitExceededExceptions, 170 // which might be wrapped by an ExecutionException. In order to 171 // respect the API contract, we must throw the cause, not the 172 // wrapper. 173 final ResourceLimitExceededException t = 174 Util.getMatchingCause( 175 node, ResourceLimitExceededException.class); 176 if (t != null) { 177 throw t; 178 } 179 180 // Check for Mondrian exceptions in the exception chain. 181 // we can throw these back as-is. 182 final MondrianException m = 183 Util.getMatchingCause( 184 node, MondrianException.class); 185 if (m != null) { 186 // Throw that. 187 throw m; 188 } 189 190 // Since we got here, this means that the exception was 191 // something else. Just wrap/throw. 192 if (node instanceof RuntimeException) { 193 throw (RuntimeException) node; 194 } else if (node instanceof Error) { 195 throw (Error) node; 196 } else { 197 throw new MondrianException(node); 198 } 199 } 200 } 201 202 public void shutdown() { 203 this.timer.cancel(); 204 this.executor.shutdown(); 205 this.tasks.clear(); 206 } 207} 208 209// End RolapResultShepherd.java 210