001/*
002// This software is subject to the terms of the Eclipse Public License v1.0
003// Agreement, available at the following URL:
004// http://www.eclipse.org/legal/epl-v10.html.
005// You must accept the terms of that agreement to use this software.
006//
007// Copyright (C) 2011-2013 Pentaho
008// All Rights Reserved.
009*/
010package mondrian.server;
011
012import mondrian.olap.*;
013import mondrian.resource.MondrianResource;
014import mondrian.rolap.RolapConnection;
015import mondrian.rolap.agg.SegmentCacheManager;
016import mondrian.server.monitor.*;
017
018import org.apache.log4j.MDC;
019
020import java.util.*;
021import java.util.Map.Entry;
022
023import java.util.concurrent.atomic.AtomicLong;
024
025/**
026 * Execution context.
027 *
028 * <p>Loosely corresponds to a CellSet. A given statement may be executed
029 * several times over its lifetime, but at most one execution can be going
030 * on at a time.</p>
031 *
032 * @author jhyde
033 */
034public class Execution {
035    /**
036     * Used for MDX logging, allows for a MDX Statement UID.
037     */
038    private static AtomicLong SEQ = new AtomicLong();
039
040    final StatementImpl statement;
041
042    /**
043     * Holds a collection of the SqlStatements which were used by this
044     * execution instance. All operations on the map must be synchronized
045     * on it.
046     */
047    private final Map<Locus, java.sql.Statement> statements =
048        new HashMap<Locus, java.sql.Statement>();
049
050    private State state = State.FRESH;
051
052    /**
053     * Lock monitor for SQL statements. All operations on
054     * {@link Execution#statements}
055     * need to be synchronized on this.
056     */
057    private final Object sqlStateLock = new Object();
058
059    /**
060     * This is a lock object to sync on when changing
061     * the {@link #state} variable.
062     */
063    private final Object stateLock = new Object();
064
065    /**
066     * If not <code>null</code>, this query was notified that it
067     * might cause an OutOfMemoryError.
068     */
069    private String outOfMemoryMsg;
070
071    private long startTimeMillis;
072    private long timeoutTimeMillis;
073    private long timeoutIntervalMillis;
074    private final QueryTiming queryTiming = new QueryTiming();
075    private int phase;
076    private int cellCacheHitCount;
077    private int cellCacheMissCount;
078    private int cellCachePendingCount;
079
080    /**
081     * Execution id, global within this JVM instance.
082     */
083    private final long id;
084
085    public static final Execution NONE = new Execution(null, 0);
086
087    private final Map<String, Object> mdc =
088        new HashMap<String, Object>();
089
090    private final Execution parent;
091
092    public Execution(
093        Statement statement,
094        long timeoutIntervalMillis)
095    {
096        Execution parentExec;
097        try {
098            parentExec = Locus.peek().execution;
099        } catch (EmptyStackException e) {
100            parentExec = null;
101        }
102        this.parent = parentExec;
103        this.id = SEQ.getAndIncrement();
104        this.statement = (StatementImpl) statement;
105        this.timeoutIntervalMillis = timeoutIntervalMillis;
106    }
107
108    /**
109     * Copy the current MDC so it can be used later
110     */
111    public void copyMDC() {
112        this.mdc.clear();
113        final Map<String, Object> currentMdc =
114            MDC.getContext();
115        if (currentMdc != null) {
116            this.mdc.putAll(currentMdc);
117        }
118    }
119
120    /**
121     * Set the copied mdc into the current MDC. This should be called
122     * any time there will be logging in a thread handled by the
123     * RolapResultShepherd where original MDC needs to be retrieved
124     */
125    public void setContextMap() {
126        final Map<String, Object> old = MDC.getContext();
127        if (old != null) {
128            old.clear();
129            old.putAll(mdc);
130        }
131    }
132
133    /**
134     * Marks the start of an Execution instance. It is called by
135     * {@link Statement#start(Execution)} automatically. Users don't
136     * need to call this method.
137     */
138    public void start() {
139        assert this.state == State.FRESH;
140        this.startTimeMillis = System.currentTimeMillis();
141        this.timeoutTimeMillis =
142            timeoutIntervalMillis > 0
143                ? this.startTimeMillis + timeoutIntervalMillis
144                : 0L;
145        this.state = State.RUNNING;
146        this.queryTiming.init(true);
147        fireExecutionStartEvent();
148    }
149
150    private String getMdx() {
151        final Query query =
152            statement.query;
153        return query != null
154            ? Util.unparse(query)
155            : null;
156    }
157
158    public void tracePhase(
159        int hitCount,
160        int missCount,
161        int pendingCount)
162    {
163        final RolapConnection connection = statement.getMondrianConnection();
164        final MondrianServer server = connection.getServer();
165        final int hitCountInc = hitCount - this.cellCacheHitCount;
166        final int missCountInc = missCount - this.cellCacheMissCount;
167        final int pendingCountInc = pendingCount - this.cellCachePendingCount;
168        server.getMonitor().sendEvent(
169            new ExecutionPhaseEvent(
170                System.currentTimeMillis(),
171                server.getId(),
172                connection.getId(),
173                statement.getId(),
174                id,
175                phase,
176                hitCountInc,
177                missCountInc,
178                pendingCountInc));
179        ++phase;
180        this.cellCacheHitCount = hitCount;
181        this.cellCacheMissCount = missCount;
182        this.cellCachePendingCount = pendingCount;
183    }
184
185    /**
186     * Cancels the execution instance.
187     */
188    public void cancel() {
189        synchronized (stateLock) {
190            this.state = State.CANCELED;
191            this.cancelSqlStatements();
192            if (parent != null) {
193                parent.cancel();
194            }
195            fireExecutionEndEvent();
196        }
197    }
198
199    /**
200     * This method will change the state of this execution to
201     * {@link State#ERROR} and will set the message to display.
202     * Cleanup of the resources used by this execution instance
203     * will be performed in the background later on.
204     * @param msg The message to display to the user, describing
205     * the problem encountered with the memory space.
206     */
207    public final void setOutOfMemory(String msg) {
208        synchronized (stateLock) {
209            assert msg != null;
210            this.outOfMemoryMsg = msg;
211            this.state = State.ERROR;
212        }
213    }
214
215    /**
216     * Checks the state of this Execution and throws an exception
217     * if something is wrong. This method should be called by the
218     * user thread.
219     * <p>It won't throw anything if the query has successfully completed.
220     * @throws MondrianException The exception encountered.
221     */
222    public synchronized void checkCancelOrTimeout() throws MondrianException {
223        if (parent != null) {
224            parent.checkCancelOrTimeout();
225        }
226        boolean needInterrupt = false;
227        switch (this.state) {
228        case CANCELED:
229            try {
230                if (Thread.interrupted()) {
231                    // Checking the state of the thread will clear the
232                    // interrupted flag so we can send an event out.
233                    // After that, we make sure that we set it again
234                    // so the thread state remains consistent.
235                    needInterrupt = true;
236                }
237                fireExecutionEndEvent();
238            } finally {
239                if (needInterrupt) {
240                    Thread.currentThread().interrupt();
241                }
242            }
243            throw MondrianResource.instance().QueryCanceled.ex();
244        case RUNNING:
245            if (timeoutTimeMillis > 0) {
246                long currTime = System.currentTimeMillis();
247                if (currTime > timeoutTimeMillis) {
248                    this.state = State.TIMEOUT;
249                    fireExecutionEndEvent();
250                    throw MondrianResource.instance().QueryTimeout.ex(
251                        timeoutIntervalMillis / 1000);
252                }
253            }
254            break;
255        case ERROR:
256            try {
257                if (Thread.interrupted()) {
258                    // Checking the state of the thread will clear the
259                    // interrupted flag so we can send an event out.
260                    // After that, we make sure that we set it again
261                    // so the thread state remains consistent.
262                    needInterrupt = true;
263                }
264                fireExecutionEndEvent();
265            } finally {
266                if (needInterrupt) {
267                    Thread.currentThread().interrupt();
268                }
269            }
270            throw new MemoryLimitExceededException(outOfMemoryMsg);
271        }
272    }
273
274    /**
275     * Returns whether this execution is currently in a failed
276     * state and will throw an exception as soon as the next check
277     * is performed using {@link Execution#checkCancelOrTimeout()}.
278     * @return True or false, depending on the timeout state.
279     */
280    public boolean isCancelOrTimeout() {
281        if (parent != null
282            && parent.isCancelOrTimeout())
283        {
284            return true;
285        }
286        synchronized (stateLock) {
287            if (state == State.CANCELED
288                || state == State.ERROR
289                || state == State.TIMEOUT
290                || (state == State.RUNNING
291                && timeoutTimeMillis > 0
292                && System.currentTimeMillis() > timeoutTimeMillis))
293            {
294                return true;
295            }
296            return false;
297        }
298    }
299
300    /**
301     * Tells whether this execution is done executing.
302     */
303    public boolean isDone() {
304        synchronized (stateLock) {
305            switch (this.state) {
306            case CANCELED:
307            case DONE:
308            case ERROR:
309            case TIMEOUT:
310                return true;
311            default:
312                return false;
313            }
314        }
315    }
316
317    /**
318     * Called by the RolapResultShepherd when the execution needs to clean all
319     * of its resources for whatever reasons, typically when an exception
320     * has occurred or the execution has ended. Any currently running SQL
321     * statements will be canceled. It should only be called if
322     * {@link Execution#isCancelOrTimeout()} returns true.
323     *
324     * <p>This method doesn't need to be called by a user. It will be called
325     * internally by Mondrian when the system is ready to clean the remaining
326     * resources.
327     *
328     * <p>To check if this execution is failed, use
329     * {@link Execution#isCancelOrTimeout()} instead.
330     */
331    public void cancelSqlStatements() {
332        if (parent != null) {
333            parent.cancelSqlStatements();
334        }
335        synchronized (sqlStateLock) {
336            for (Iterator<Entry<Locus, java.sql.Statement>> iterator =
337                     statements.entrySet().iterator();
338                 iterator.hasNext();)
339            {
340                // Remove entry from the map before trying to cancel the
341                // statement, so that if the cancel throws, we will not try to
342                // cancel again. It's possible that we will try to cancel the
343                // other statements later.
344                final Entry<Locus, java.sql.Statement> entry = iterator.next();
345                final java.sql.Statement statement1 = entry.getValue();
346                iterator.remove();
347                // We only want to cancel the statement, but we can't close it.
348                // Some drivers will not notice the interruption flag on their
349                // own thread before a considerable time has passed. If we were
350                // using a pooling layer, calling close() would make the
351                // underlying connection available again, despite the first
352                // statement still being processed. Some drivers will fail
353                // there. It is therefore important to close and release the
354                // resources on the proper thread, namely, the thread which
355                // runs the actual statement.
356                Util.cancelStatement(statement1);
357            }
358            // Also cleanup the segment registrations from the index.
359            unregisterSegmentRequests();
360        }
361    }
362
363    /**
364     * Called when query execution has completed.  Once query execution has
365     * ended, it is not possible to cancel or timeout the query until it
366     * starts executing again.
367     */
368    public void end() {
369        synchronized (stateLock) {
370            queryTiming.done();
371            if (this.state == State.FRESH
372                || this.state == State.RUNNING)
373            {
374                this.state = State.DONE;
375            }
376            // Clear pointer to pending SQL statements
377            statements.clear();
378            // Unregister all segments
379            unregisterSegmentRequests();
380            // Fire up a monitor event.
381            fireExecutionEndEvent();
382        }
383    }
384
385    /**
386     * Calls into the SegmentCacheManager and unregisters all the
387     * registrations made for this execution on segments form
388     * the index.
389     */
390    public void unregisterSegmentRequests() {
391        // We also have to cancel all requests for the current segments.
392        final Locus locus =
393            new Locus(
394                this,
395                "Execution.unregisterSegmentRequests",
396                "cleaning up segment registrations");
397        final SegmentCacheManager mgr =
398            locus.getServer()
399                .getAggregationManager().cacheMgr;
400        mgr.execute(
401            new SegmentCacheManager.Command<Void>() {
402                public Void call() throws Exception {
403                    mgr.getIndexRegistry()
404                        .cancelExecutionSegments(Execution.this);
405                    return null;
406                }
407                public Locus getLocus() {
408                    return locus;
409                }
410            });
411    }
412
413    public final long getStartTime() {
414        return startTimeMillis;
415    }
416
417    public final mondrian.server.Statement getMondrianStatement() {
418        return statement;
419    }
420
421    public final QueryTiming getQueryTiming() {
422        return queryTiming;
423    }
424
425    public final long getId() {
426        return id;
427    }
428
429    public final long getElapsedMillis() {
430        return System.currentTimeMillis() - startTimeMillis;
431    }
432
433    /**
434     * This method is typically called by SqlStatement at construction time.
435     * It ties all Statement objects to a particular Execution instance
436     * so that we can audit, monitor and gracefully cancel an execution.
437     * @param statement The statement used by this execution.
438     */
439    public void registerStatement(Locus locus, java.sql.Statement statement) {
440        synchronized (sqlStateLock) {
441            synchronized (stateLock) {
442                if (state == State.FRESH) {
443                    start();
444                }
445                if (state == State.RUNNING) {
446                    this.statements.put(locus, statement);
447                }
448            }
449        }
450    }
451
452    private void fireExecutionEndEvent() {
453        final RolapConnection connection =
454            statement.getMondrianConnection();
455        final MondrianServer server = connection.getServer();
456        server.getMonitor().sendEvent(
457            new ExecutionEndEvent(
458                this.startTimeMillis,
459                server.getId(),
460                connection.getId(),
461                this.statement.getId(),
462                this.id,
463                this.phase,
464                this.state,
465                this.cellCacheHitCount,
466                this.cellCacheMissCount,
467                this.cellCachePendingCount));
468    }
469
470    private void fireExecutionStartEvent() {
471        final RolapConnection connection =
472            statement.getMondrianConnection();
473        final MondrianServer server = connection.getServer();
474        server.getMonitor().sendEvent(
475            new ExecutionStartEvent(
476                startTimeMillis,
477                server.getId(),
478                connection.getId(),
479                statement.getId(),
480                id,
481                getMdx()));
482    }
483
484    /**
485     * Enumeration of the states of an Execution instance.
486     */
487    public enum State {
488        /**
489         * Identifies the state in which an execution is before it has
490         * started resolving the query. This doesn't mean that there are
491         * no current SQL statements already beeing executed.
492         */
493        FRESH,
494        RUNNING,
495        ERROR,
496        CANCELED,
497        TIMEOUT,
498        DONE,
499    }
500}
501
502// End Execution.java