001/*
002// This software is subject to the terms of the Eclipse Public License v1.0
003// Agreement, available at the following URL:
004// http://www.eclipse.org/legal/epl-v10.html.
005// You must accept the terms of that agreement to use this software.
006//
007// Copyright (C) 2011-2013 Pentaho
008// All Rights Reserved.
009*/
010package mondrian.server;
011
012import mondrian.olap.MondrianProperties;
013import mondrian.olap.Util;
014import mondrian.rolap.RolapUtil;
015import mondrian.server.monitor.*;
016import mondrian.util.*;
017
018import org.apache.log4j.Logger;
019
020import java.util.*;
021import java.util.concurrent.ArrayBlockingQueue;
022import java.util.concurrent.BlockingQueue;
023
024/**
025 * Process that reads from the monitor stream and updates counters.
026 *
027 * <p>Internally, uses a dedicated thread to process events. Events received
028 * from log4j are placed on a queue. This "Active object" or "Actor" pattern
029 * means that the data structures that hold counters do not need to be locked.
030 *
031 * <p>Command requests are treated like events. They place their result on
032 * a result queue.</p>
033 *
034 * <p>A {@link Visitor visitor} quickly dispatches events and commands
035 * to the appropriate handler method.</p>
036 *
037 * <p>The monitored objects form a hierarchy. For each object type, there is
038 * a mutable workspace (whose members are private and non-final) that is
039 * converted into a monitor object (whose members are public and final) when
040 * its {@code fix()} method is called:</p>
041 *
042 * <ul>
043 *     <li>{@link MutableServerInfo} &rarr; {@link ServerInfo}</li>
044 *     <ul>
045 *         <li>{@link MutableConnectionInfo} &rarr; {@link ConnectionInfo}</li>
046 *         <ul>
047 *             <li>{@link MutableStatementInfo}
048 *                 &rarr; {@link StatementInfo}</li>
049 *             <ul>
050 *                 <li>{@link MutableExecutionInfo}
051 *                     &rarr; {@link ExecutionInfo}</li>
052 *                 <ul>
053 *                     <li>{@link MutableSqlStatementInfo}
054 *                         &rarr; {@link SqlStatementInfo}</li>
055 *                 </ul>
056 *             </ul>
057 *         </ul>
058 *     </ul>
059 * </ul>
060 */
061class MonitorImpl
062    implements Monitor
063{
064    private static final Logger LOGGER = Logger.getLogger(MonitorImpl.class);
065    private final Handler handler = new Handler();
066
067    protected static final Util.MemoryInfo MEMORY_INFO = Util.getMemoryInfo();
068
069    private static final Actor ACTOR = new Actor();
070
071    static {
072        // Create and start thread for actor.
073        //
074        // Actor is shared between all servers. This reduces concurrency, but
075        // not a concern because monitoring events are not very numerous.
076        // We tried creating one actor (and therefore thread) per server, but
077        // some applications (and in particular some tests) create lots of
078        // servers.
079        //
080        // The actor is shut down with the JVM.
081        final Thread thread = new Thread(ACTOR, "Mondrian Monitor");
082        thread.setDaemon(true);
083        thread.start();
084    }
085
086    /**
087     * Creates a Monitor.
088     */
089    public MonitorImpl() {
090    }
091
092    // Commands
093
094    public void shutdown() {
095        // Nothing to do. Cannot shut down actor, because it shared between
096        // all servers.
097    }
098
099    public void sendEvent(Event event) {
100        // The implementation does not need to take any locks.
101        try {
102            if (Thread.interrupted()) {
103                // Interrupt should not happen. Mondrian uses cancel without
104                // setting interrupt. But if interrupts are happening, it's
105                // best to know now, rather than failing next time we make a
106                // blocking system call.
107                throw new AssertionError();
108            }
109            ACTOR.eventQueue.put(Pair.<Handler, Message>of(handler, event));
110        } catch (InterruptedException e) {
111            throw Util.newError(e, "Exception while sending event " + event);
112        }
113    }
114
115    public ServerInfo getServer() {
116        return (ServerInfo) execute(new ServerCommand());
117    }
118
119    public List<ConnectionInfo> getConnections() {
120        //noinspection unchecked
121        return (List<ConnectionInfo>) execute(new ConnectionsCommand());
122    }
123
124    public List<StatementInfo> getStatements() {
125        //noinspection unchecked
126        return (List<StatementInfo>) execute(new StatementsCommand());
127    }
128
129    public List<SqlStatementInfo> getSqlStatements() {
130        //noinspection unchecked
131        return (List<SqlStatementInfo>) execute(new SqlStatementsCommand());
132    }
133
134    private Object execute(Command command) {
135        return ACTOR.execute(handler, command);
136    }
137
138    // Command and response classes
139
140    /**
141     * A kind of message that produces a response. The response may be null,
142     * but even so, it will be stored and the caller must collect it.
143     */
144    static abstract class Command implements Message {
145    }
146
147    static class StatementsCommand extends Command {
148        public <T> T accept(Visitor<T> visitor) {
149            return ((CommandVisitor<T>) visitor).visit(this);
150        }
151    }
152
153    static class SqlStatementsCommand extends Command {
154        public <T> T accept(Visitor<T> visitor) {
155            return ((CommandVisitor<T>) visitor).visit(this);
156        }
157    }
158
159    static class ConnectionsCommand extends Command {
160        public <T> T accept(Visitor<T> visitor) {
161            return ((CommandVisitor<T>) visitor).visit(this);
162        }
163    }
164
165    static class ServerCommand extends Command {
166        public <T> T accept(Visitor<T> visitor) {
167            return ((CommandVisitor<T>) visitor).visit(this);
168        }
169    }
170
171    static class ShutdownCommand extends Command {
172        public <T> T accept(Visitor<T> visitor) {
173            return ((CommandVisitor<T>) visitor).visit(this);
174        }
175    }
176
177    /**
178     * Extension to {@link Visitor} to allow commands as well as events.
179     *
180     * @param <T> Return type
181     */
182    static interface CommandVisitor<T> extends Visitor<T> {
183        T visit(ConnectionsCommand connectionsCommand);
184        T visit(ServerCommand serverCommand);
185        T visit(SqlStatementsCommand command);
186        T visit(StatementsCommand command);
187        T visit(ShutdownCommand command);
188    }
189
190    /**
191     * Workspace to collect statistics about the execution of a Mondrian server.
192     */
193    private static class MutableServerInfo {
194        private final MutableSqlStatementInfo aggSql =
195            new MutableSqlStatementInfo(null, -1, null, null);
196        private final MutableExecutionInfo aggExec =
197            new MutableExecutionInfo(null, -1, null);
198        private final MutableStatementInfo aggStmt =
199            new MutableStatementInfo(null, -1, null);
200        private final MutableConnectionInfo aggConn =
201            new MutableConnectionInfo(null);
202        private final String stack;
203
204        public MutableServerInfo(String stack) {
205            this.stack = stack;
206        }
207
208        public ServerInfo fix() {
209            Util.MemoryInfo.Usage memoryUsage = MEMORY_INFO.get();
210            return new ServerInfo(
211                stack,
212                aggConn.startCount,
213                aggConn.endCount,
214                aggStmt.startCount,
215                aggStmt.endCount,
216                aggSql.startCount,
217                aggSql.executeCount,
218                aggSql.endCount,
219                aggSql.rowFetchCount,
220                aggSql.executeNanos,
221                aggSql.cellRequestCount,
222                aggExec.cellCacheHitCount,
223                aggExec.cellCacheRequestCount,
224                aggExec.cellCacheMissCount,
225                aggExec.cellCachePendingCount,
226                aggExec.startCount,
227                aggExec.endCount,
228                memoryUsage.getUsed(),
229                memoryUsage.getCommitted(),
230                memoryUsage.getMax(),
231                (aggExec.cellCacheSegmentCreateCount
232                 - aggExec.cellCacheSegmentDeleteCount),
233                aggExec.cellCacheSegmentCreateCount,
234                aggExec.cellCacheSegmentCreateViaExternalCount,
235                aggExec.cellCacheSegmentDeleteViaExternalCount,
236                aggExec.cellCacheSegmentCreateViaRollupCount,
237                aggExec.cellCacheSegmentCreateViaSqlCount,
238                aggExec.cellCacheSegmentCellCount,
239                aggExec.cellCacheSegmentCoordinateSum);
240        }
241    }
242
243    /**
244     * Workspace to collect statistics about the execution of a Mondrian MDX
245     * statement. Parent context is the server.
246     */
247    private static class MutableConnectionInfo {
248        private final MutableExecutionInfo aggExec =
249            new MutableExecutionInfo(null, -1, null);
250        private final MutableStatementInfo aggStmt =
251            new MutableStatementInfo(null, -1, null);
252        private int startCount;
253        private int endCount;
254        private final String stack;
255
256        public MutableConnectionInfo(String stack) {
257            this.stack = stack;
258        }
259
260        public ConnectionInfo fix() {
261            return new ConnectionInfo(
262                stack,
263                aggExec.cellCacheHitCount,
264                aggExec.cellCacheRequestCount,
265                aggExec.cellCacheMissCount,
266                aggExec.cellCachePendingCount,
267                aggStmt.startCount,
268                aggStmt.endCount,
269                aggExec.startCount,
270                aggExec.endCount);
271        }
272    }
273
274    /**
275     * Workspace to collect statistics about the execution of a Mondrian MDX
276     * statement. Parent context is the connection.
277     */
278    private static class MutableStatementInfo {
279        private final MutableConnectionInfo conn;
280        private final long statementId;
281        private final MutableExecutionInfo aggExec =
282            new MutableExecutionInfo(null, -1, null);
283        private final MutableSqlStatementInfo aggSql =
284            new MutableSqlStatementInfo(null, -1, null, null);
285        private int startCount;
286        private int endCount;
287        private final String stack;
288
289        public MutableStatementInfo(
290            MutableConnectionInfo conn,
291            long statementId,
292            String stack)
293        {
294            this.statementId = statementId;
295            this.conn = conn;
296            this.stack = stack;
297        }
298
299        public StatementInfo fix() {
300            return new StatementInfo(
301                stack,
302                statementId,
303                aggExec.startCount,
304                aggExec.endCount,
305                aggExec.phaseCount,
306                aggExec.cellCacheRequestCount,
307                aggExec.cellCacheHitCount,
308                aggExec.cellCacheMissCount,
309                aggExec.cellCachePendingCount,
310                aggSql.startCount,
311                aggSql.executeCount,
312                aggSql.endCount,
313                aggSql.rowFetchCount,
314                aggSql.executeNanos,
315                aggSql.cellRequestCount);
316        }
317    }
318
319    /**
320     * <p>Workspace to collect statistics about the execution of a Mondrian MDX
321     * statement. A statement execution occurs within the context of a
322     * statement.</p>
323     *
324     * <p>Most statements are executed only once. It is possible
325     * (if you use the {@link org.olap4j.PreparedOlapStatement} API for
326     * instance) to execute a statement more than once. There can be at most
327     * one execution at a time for a given statement. Thus a statement's
328     * executeStartCount and executeEndCount should never differ by more than
329     * 1.</p>
330     */
331    private static class MutableExecutionInfo {
332        private final MutableStatementInfo stmt;
333        private final long executionId;
334        private final MutableSqlStatementInfo aggSql =
335            new MutableSqlStatementInfo(null, -1, null, null);
336        private int startCount;
337        private int phaseCount;
338        private int endCount;
339        private int cellCacheRequestCount;
340        private int cellCacheHitCount;
341        private int cellCacheMissCount;
342        private int cellCachePendingCount;
343        private int cellCacheHitCountDelta;
344        private int cellCacheMissCountDelta;
345        private int cellCachePendingCountDelta;
346        private int cellCacheSegmentCreateCount;
347        private int cellCacheSegmentCreateViaRollupCount;
348        private int cellCacheSegmentCreateViaSqlCount;
349        private int cellCacheSegmentCreateViaExternalCount;
350        private int cellCacheSegmentDeleteViaExternalCount;
351        private int cellCacheSegmentDeleteCount;
352        private int cellCacheSegmentCoordinateSum;
353        private int cellCacheSegmentCellCount;
354        private final String stack;
355
356        public MutableExecutionInfo(
357            MutableStatementInfo stmt,
358            long executionId,
359            String stack)
360        {
361            this.stmt = stmt;
362            this.executionId = executionId;
363            this.stack = stack;
364        }
365
366        public ExecutionInfo fix() {
367            return new ExecutionInfo(
368                stack,
369                executionId,
370                phaseCount,
371                cellCacheRequestCount,
372                cellCacheHitCount,
373                cellCacheMissCount,
374                cellCachePendingCount,
375                aggSql.startCount,
376                aggSql.executeCount,
377                aggSql.endCount,
378                aggSql.rowFetchCount,
379                aggSql.executeNanos,
380                aggSql.cellRequestCount);
381        }
382    }
383
384    /**
385     * Workspace to collect statistics about the execution of a SQL statement.
386     * A SQL statement execution occurs within the context of a Mondrian MDX
387     * statement.
388     */
389    private static class MutableSqlStatementInfo {
390        private final MutableStatementInfo stmt; // parent context
391        private final long sqlStatementId;
392        private int startCount;
393        private int executeCount;
394        private int endCount;
395        private int cellRequestCount;
396        private long executeNanos;
397        private long rowFetchCount;
398        private final String stack;
399        private final String sql;
400
401        public MutableSqlStatementInfo(
402            MutableStatementInfo stmt,
403            long sqlStatementId,
404            String sql,
405            String stack)
406        {
407            this.sqlStatementId = sqlStatementId;
408            this.stmt = stmt;
409            this.sql = sql;
410            this.stack = stack;
411        }
412
413        public SqlStatementInfo fix() {
414            return new SqlStatementInfo(
415                stack,
416                sqlStatementId,
417                sql);
418        }
419    }
420
421    private static class Handler implements CommandVisitor<Object> {
422
423        private final MutableServerInfo server =
424            new MutableServerInfo(null);
425
426        private final Map<Integer, MutableConnectionInfo> connectionMap =
427            new LinkedHashMap<Integer, MutableConnectionInfo>(
428                MondrianProperties.instance().ExecutionHistorySize.get(),
429                0.8f,
430                false)
431            {
432                private final int maxSize =
433                    MondrianProperties.instance().ExecutionHistorySize.get();
434                private static final long serialVersionUID = 1L;
435                protected boolean removeEldestEntry(
436                    Map.Entry<Integer, MutableConnectionInfo> e)
437                {
438                    if (size() > maxSize) {
439                        if (RolapUtil.MONITOR_LOGGER.isTraceEnabled()) {
440                            RolapUtil.MONITOR_LOGGER.trace(
441                                "ConnectionInfo("
442                                + e.getKey()
443                                + ") evicted. Stack is:"
444                                + Util.nl
445                                + e.getValue().stack);
446                        }
447                        return true;
448                    }
449                    return false;
450                }
451            };
452
453        private final Map<Long, MutableSqlStatementInfo> sqlStatementMap =
454            new LinkedHashMap<Long, MutableSqlStatementInfo>(
455                MondrianProperties.instance().ExecutionHistorySize.get(),
456                0.8f,
457                false)
458            {
459                private final int maxSize =
460                    MondrianProperties.instance().ExecutionHistorySize.get();
461                private static final long serialVersionUID = 1L;
462                protected boolean removeEldestEntry(
463                    Map.Entry<Long, MutableSqlStatementInfo> e)
464                {
465                    if (size() > maxSize) {
466                        if (RolapUtil.MONITOR_LOGGER.isTraceEnabled()) {
467                            RolapUtil.MONITOR_LOGGER.trace(
468                                "StatementInfo("
469                                + e.getKey()
470                                + ") evicted. Stack is:"
471                                + Util.nl
472                                + e.getValue().stack);
473                        }
474                        return true;
475                    }
476                    return false;
477                }
478            };
479
480        private final Map<Long, MutableStatementInfo> statementMap =
481            new LinkedHashMap<Long, MutableStatementInfo>(
482                MondrianProperties.instance().ExecutionHistorySize.get(),
483                0.8f,
484                false)
485            {
486                private final int maxSize =
487                    MondrianProperties.instance().ExecutionHistorySize.get();
488                private static final long serialVersionUID = 1L;
489                protected boolean removeEldestEntry(
490                    Map.Entry<Long, MutableStatementInfo> e)
491                {
492                    if (size() > maxSize) {
493                        if (RolapUtil.MONITOR_LOGGER.isTraceEnabled()) {
494                            RolapUtil.MONITOR_LOGGER.trace(
495                                "StatementInfo("
496                                + e.getKey()
497                                + ") evicted. Stack is:"
498                                + Util.nl
499                                + e.getValue().stack);
500                        }
501                        return true;
502                    }
503                    return false;
504                }
505            };
506
507        private final Map<Long, MutableExecutionInfo> executionMap =
508            new LinkedHashMap<Long, MutableExecutionInfo>(
509                MondrianProperties.instance().ExecutionHistorySize.get(),
510                0.8f,
511                false)
512            {
513                private final int maxSize =
514                    MondrianProperties.instance().ExecutionHistorySize.get();
515                private static final long serialVersionUID = 1L;
516                protected boolean removeEldestEntry(
517                    Map.Entry<Long, MutableExecutionInfo> e)
518                {
519                    if (size() > maxSize) {
520                        if (RolapUtil.MONITOR_LOGGER.isTraceEnabled()) {
521                            RolapUtil.MONITOR_LOGGER.trace(
522                                "ExecutionInfo("
523                                + e.getKey()
524                                + ") evicted. Stack is:"
525                                + Util.nl
526                                + e.getValue().stack);
527                        }
528                        return true;
529                    }
530                    return false;
531                }
532        };
533
534        /**
535         * Holds info for executions that have ended. Cell cache events may
536         * arrive late, and this map lets them get into the system.
537         */
538        private final Map<Long, MutableExecutionInfo> retiredExecutionMap =
539            new LinkedHashMap<Long, MutableExecutionInfo>(
540                MondrianProperties.instance().ExecutionHistorySize.get(),
541                0.8f,
542                false)
543            {
544                private final int maxSize =
545                    MondrianProperties.instance().ExecutionHistorySize.get();
546                private static final long serialVersionUID = 1L;
547                protected boolean removeEldestEntry(
548                    Map.Entry<Long, MutableExecutionInfo> e)
549                {
550                    if (size() > maxSize) {
551                        if (RolapUtil.MONITOR_LOGGER.isTraceEnabled()) {
552                            RolapUtil.MONITOR_LOGGER.trace(
553                                "Retired ExecutionInfo("
554                                + e.getKey()
555                                + ") evicted. Stack is:"
556                                + Util.nl
557                                + e.getValue().stack);
558                        }
559                        return true;
560                    }
561                    return false;
562                }
563        };
564
565        /**
566         * Method for debugging that does nothing, but is a place to put a break
567         * point to find out places where an event or its parent should be
568         * registered but is not.
569         *
570         * @param event Event
571         * @return Always null
572         */
573        private Object missing(Event event) {
574            return null;
575        }
576
577        public Object visit(ConnectionStartEvent event) {
578            final MutableConnectionInfo conn =
579                new MutableConnectionInfo(event.stack);
580            connectionMap.put(event.connectionId, conn);
581            foo(conn, event);
582            foo(server.aggConn, event);
583            if (RolapUtil.MONITOR_LOGGER.isTraceEnabled()) {
584                RolapUtil.MONITOR_LOGGER.trace(
585                    "Connection("
586                    + event.connectionId
587                    + ") created. stack is:"
588                    + Util.nl
589                    + event.stack);
590            }
591            return null;
592        }
593
594        private void foo(
595            MutableConnectionInfo conn,
596            ConnectionStartEvent event)
597        {
598            ++conn.startCount;
599        }
600
601        public Object visit(ConnectionEndEvent event) {
602            final MutableConnectionInfo conn =
603                connectionMap.remove(event.connectionId);
604            if (conn == null) {
605                return missing(event);
606            }
607            foo(conn, event);
608            foo(server.aggConn, event);
609
610            // Since the connection info will no longer be in the table,
611            // broadcast the final info to anyone who is interested.
612            RolapUtil.MONITOR_LOGGER.debug(conn.fix());
613            return null;
614        }
615
616        private void foo(
617            MutableConnectionInfo conn,
618            ConnectionEndEvent event)
619        {
620            ++conn.endCount;
621        }
622
623        public Object visit(StatementStartEvent event) {
624            final MutableConnectionInfo conn =
625                connectionMap.get(event.connectionId);
626            if (conn == null) {
627                return missing(event);
628            }
629            final MutableStatementInfo stmt =
630                new MutableStatementInfo(
631                    conn, event.statementId, event.stack);
632            statementMap.put(event.statementId, stmt);
633            foo(stmt, event);
634            foo(conn.aggStmt, event);
635            foo(server.aggStmt, event);
636            if (RolapUtil.MONITOR_LOGGER.isTraceEnabled()) {
637                RolapUtil.MONITOR_LOGGER.trace(
638                    "Statement("
639                    + event.statementId
640                    + ") created. stack is:"
641                    + Util.nl
642                    + event.stack);
643            }
644            return null;
645        }
646
647        private void foo(
648            MutableStatementInfo stmt,
649            StatementStartEvent event)
650        {
651            ++stmt.startCount;
652        }
653
654        public Object visit(StatementEndEvent event) {
655            final MutableStatementInfo stmt =
656                statementMap.remove(event.statementId);
657            if (stmt == null) {
658                return missing(event);
659            }
660            foo(stmt, event);
661            foo(stmt.conn.aggStmt, event);
662            foo(server.aggStmt, event);
663
664            // Since the statement info will no longer be in the table,
665            // broadcast the final info to anyone who is interested.
666            RolapUtil.MONITOR_LOGGER.debug(stmt.fix());
667            return null;
668        }
669
670        private void foo(
671            MutableStatementInfo stmt,
672            StatementEndEvent event)
673        {
674            ++stmt.endCount;
675        }
676
677        public Object visit(ExecutionStartEvent event) {
678            MutableStatementInfo stmt =
679                statementMap.get(event.statementId);
680            if (stmt == null) {
681                return missing(event);
682            }
683            final MutableExecutionInfo exec =
684                new MutableExecutionInfo(
685                    stmt, event.executionId, event.stack);
686            executionMap.put(event.executionId, exec);
687
688            foo(exec, event);
689            foo(stmt.aggExec, event);
690            foo(stmt.conn.aggExec, event);
691            foo(server.aggExec, event);
692            if (RolapUtil.MONITOR_LOGGER.isTraceEnabled()) {
693                RolapUtil.MONITOR_LOGGER.trace(
694                    "Execution("
695                    + event.executionId
696                    + ") created. stack is:"
697                    + Util.nl
698                    + event.stack);
699            }
700            return null;
701        }
702
703        private void foo(
704            MutableExecutionInfo exec,
705            ExecutionStartEvent event)
706        {
707            ++exec.startCount;
708        }
709
710        public Object visit(ExecutionPhaseEvent event) {
711            final MutableExecutionInfo exec =
712                executionMap.get(event.executionId);
713            if (exec == null) {
714                return missing(event);
715            }
716            executionMap.put(event.executionId, exec);
717
718            foo(exec, event);
719            foo(exec.stmt.aggExec, event);
720            foo(exec.stmt.conn.aggExec, event);
721            foo(server.aggExec, event);
722            return null;
723        }
724
725        private void foo(
726            MutableExecutionInfo exec,
727            ExecutionPhaseEvent event)
728        {
729            ++exec.phaseCount;
730            exec.cellCacheHitCountDelta = event.hitCount;
731            exec.cellCacheMissCountDelta = event.missCount;
732            exec.cellCachePendingCountDelta = event.pendingCount;
733        }
734
735        public Object visit(ExecutionEndEvent event) {
736            final MutableExecutionInfo exec =
737                executionMap.remove(event.executionId);
738            if (exec == null) {
739                return missing(event);
740            }
741            retiredExecutionMap.put(exec.executionId, exec);
742            foo(exec, event);
743            foo(exec.stmt.aggExec, event);
744            foo(exec.stmt.conn.aggExec, event);
745            foo(server.aggExec, event);
746
747            // Since the execution info will no longer be in the table,
748            // broadcast the final info to anyone who is interested.
749            RolapUtil.MONITOR_LOGGER.debug(exec.fix());
750            return null;
751        }
752
753        private void foo(
754            MutableExecutionInfo exec,
755            ExecutionEndEvent event)
756        {
757            // NOTE: 'exec.phaseCount += event.phaseCount' would be wrong,
758            // because we have already incremented each time we got an
759            // ExecutionPhaseEvent. For a similar reason, we do not update
760            // exec.cellCacheHitCount etc. each phase.
761
762            ++exec.endCount;
763            ++exec.phaseCount;
764            exec.cellCacheHitCount += event.cellCacheHitCount;
765            exec.cellCacheMissCount += event.cellCacheMissCount;
766            exec.cellCachePendingCount += event.cellCachePendingCount;
767            exec.cellCacheRequestCount +=
768                (event.cellCacheHitCount
769                + event.cellCacheMissCount
770                + event.cellCachePendingCount);
771            exec.cellCacheHitCountDelta = 0;
772            exec.cellCacheMissCountDelta = 0;
773            exec.cellCachePendingCountDelta = 0;
774        }
775
776        public Object visit(CellCacheSegmentCreateEvent event) {
777            MutableExecutionInfo exec =
778                executionMap.get(event.executionId);
779            if (exec == null) {
780                // Cache events can sometimes arrive after the execution has
781                // ended. So, look into the retired map.
782                exec = retiredExecutionMap.get(event.executionId);
783                if (exec == null) {
784                    return missing(event);
785                }
786            }
787
788            foo(exec, event);
789            foo(exec.stmt.aggExec, event);
790            foo(exec.stmt.conn.aggExec, event);
791            foo(server.aggExec, event);
792            return null;
793        }
794
795        private void foo(
796            MutableExecutionInfo exec,
797            CellCacheSegmentCreateEvent event)
798        {
799            ++exec.cellCacheSegmentCreateCount;
800            exec.cellCacheSegmentCoordinateSum += event.coordinateCount;
801            exec.cellCacheSegmentCellCount += event.actualCellCount;
802            switch (event.source) {
803            case ROLLUP:
804                ++exec.cellCacheSegmentCreateViaRollupCount;
805                break;
806            case EXTERNAL:
807                ++exec.cellCacheSegmentCreateViaExternalCount;
808                break;
809            case SQL:
810                ++exec.cellCacheSegmentCreateViaSqlCount;
811                break;
812            default:
813                throw Util.unexpected(event.source);
814            }
815        }
816
817        public Object visit(CellCacheSegmentDeleteEvent event) {
818            final MutableExecutionInfo exec =
819                executionMap.get(event.executionId);
820            if (exec == null) {
821                return missing(event);
822            }
823
824            foo(exec, event);
825            foo(exec.stmt.aggExec, event);
826            foo(exec.stmt.conn.aggExec, event);
827            foo(server.aggExec, event);
828            return null;
829        }
830
831        private void foo(
832            MutableExecutionInfo exec,
833            CellCacheSegmentDeleteEvent event)
834        {
835            ++exec.cellCacheSegmentDeleteCount;
836            exec.cellCacheSegmentCoordinateSum -= event.coordinateCount;
837            switch (event.source) {
838            case EXTERNAL:
839                ++exec.cellCacheSegmentDeleteViaExternalCount;
840                break;
841            }
842        }
843
844        public Object visit(SqlStatementStartEvent event) {
845            final MutableStatementInfo stmt =
846                statementMap.get(
847                    event.getStatementId());
848            if (stmt == null) {
849                return missing(event);
850            }
851            final MutableSqlStatementInfo sql =
852                new MutableSqlStatementInfo(
853                    stmt,
854                    event.sqlStatementId,
855                    event.sql,
856                    event.stack);
857            sqlStatementMap.put(event.sqlStatementId, sql);
858            foo(sql, event);
859            foo(sql.stmt.aggSql, event);
860            foo(server.aggSql, event);
861            if (RolapUtil.MONITOR_LOGGER.isTraceEnabled()) {
862                RolapUtil.MONITOR_LOGGER.trace(
863                    "SqlStatement("
864                    + event.sqlStatementId
865                    + ") created. stack is:"
866                    + Util.nl
867                    + event.stack);
868            }
869            return null;
870        }
871
872        private void foo(
873            MutableSqlStatementInfo sql,
874            SqlStatementStartEvent event)
875        {
876            ++sql.startCount;
877            sql.cellRequestCount += event.cellRequestCount;
878        }
879
880        public Object visit(SqlStatementExecuteEvent event) {
881            final MutableSqlStatementInfo sql =
882                sqlStatementMap.get(event.sqlStatementId);
883            if (sql == null) {
884                return missing(event);
885            }
886            foo(sql, event);
887            foo(sql.stmt.aggSql, event);
888            foo(server.aggSql, event);
889            return null;
890        }
891
892        private void foo(
893            MutableSqlStatementInfo sql,
894            SqlStatementExecuteEvent event)
895        {
896            ++sql.executeCount;
897            sql.executeNanos += event.executeNanos;
898        }
899
900        public Object visit(SqlStatementEndEvent event) {
901            final MutableSqlStatementInfo sql =
902                sqlStatementMap.remove(event.sqlStatementId);
903            if (sql == null) {
904                return missing(event);
905            }
906            foo(sql, event);
907            foo(sql.stmt.aggSql, event);
908            foo(server.aggSql, event);
909
910            // Since the SQL statement info will no longer be in the table,
911            // broadcast the final info to anyone who is interested.
912            RolapUtil.MONITOR_LOGGER.debug(sql.fix());
913            return null;
914        }
915
916        private void foo(
917            MutableSqlStatementInfo sql,
918            SqlStatementEndEvent event)
919        {
920            ++sql.endCount;
921            sql.rowFetchCount += event.rowFetchCount;
922        }
923
924        public Object visit(ConnectionsCommand connectionsCommand) {
925            List<ConnectionInfo> list =
926                new ArrayList<ConnectionInfo>();
927            for (MutableConnectionInfo info : connectionMap.values()) {
928                list.add(info.fix());
929            }
930            return list;
931        }
932
933        public Object visit(ServerCommand serverCommand) {
934            return server.fix();
935        }
936
937        public Object visit(SqlStatementsCommand command) {
938            List<SqlStatementInfo> list =
939                new ArrayList<SqlStatementInfo>();
940            for (MutableSqlStatementInfo info : sqlStatementMap.values()) {
941                list.add(info.fix());
942            }
943            return list;
944        }
945
946        public Object visit(StatementsCommand command) {
947            List<StatementInfo> list =
948                new ArrayList<StatementInfo>();
949            for (MutableStatementInfo info : statementMap.values()) {
950                list.add(info.fix());
951            }
952            return list;
953        }
954
955        public Object visit(ShutdownCommand command) {
956            return "Shutdown succeeded";
957        }
958    }
959
960    private static class Actor implements Runnable {
961        private boolean running = true;
962
963        private final BlockingQueue<Pair<Handler, Message>> eventQueue =
964            new ArrayBlockingQueue<Pair<Handler, Message>>(1000);
965
966        private final BlockingHashMap<Command, Object> responseMap =
967            new BlockingHashMap<Command, Object>(1000);
968
969        public void run() {
970            try {
971                for (;;) {
972                    try {
973                        final Pair<Handler, Message> entry = eventQueue.take();
974                        final Handler handler = entry.left;
975                        final Message message = entry.right;
976                        final Object result = message.accept(handler);
977                        if (message instanceof Command) {
978                            responseMap.put((Command) message, result);
979                        } else {
980                            // Broadcast the event to anyone who is interested.
981                            RolapUtil.MONITOR_LOGGER.debug(message);
982                        }
983                        if (message instanceof ShutdownCommand) {
984                            LOGGER.debug(
985                                "ShutdownCommand received. Monitor thread is shutting down.");
986                            return;
987                        }
988                    } catch (InterruptedException e) {
989                        Thread.currentThread().interrupt();
990                        LOGGER.warn(
991                            "Monitor thread interrupted.",
992                            e);
993                        return;
994                    } catch (Throwable t) {
995                        LOGGER.error(
996                            "Runtime error on the monitor thread.",
997                            t);
998                    }
999                }
1000            } finally {
1001                running = false;
1002            }
1003        }
1004
1005        public void shutdown() {
1006            // No point sending a command if (for some reason) there's no thread
1007            // listening to the command queue.
1008            if (running) {
1009                execute(null, new ShutdownCommand());
1010            }
1011        }
1012
1013        Object execute(Handler handler, Command command) {
1014            try {
1015                eventQueue.put(Pair.<Handler, Message>of(handler, command));
1016            } catch (InterruptedException e) {
1017                throw Util.newError(e, "Interrupted while sending " + command);
1018            }
1019            try {
1020                return responseMap.get(command);
1021            } catch (InterruptedException e) {
1022                throw Util.newError(e, "Interrupted while awaiting " + command);
1023            }
1024        }
1025    }
1026}
1027
1028// End MonitorImpl.java