001/* 002// This software is subject to the terms of the Eclipse Public License v1.0 003// Agreement, available at the following URL: 004// http://www.eclipse.org/legal/epl-v10.html. 005// You must accept the terms of that agreement to use this software. 006// 007// Copyright (C) 2011-2013 Pentaho 008// All Rights Reserved. 009*/ 010package mondrian.server; 011 012import mondrian.olap.MondrianProperties; 013import mondrian.olap.Util; 014import mondrian.rolap.RolapUtil; 015import mondrian.server.monitor.*; 016import mondrian.util.*; 017 018import org.apache.log4j.Logger; 019 020import java.util.*; 021import java.util.concurrent.ArrayBlockingQueue; 022import java.util.concurrent.BlockingQueue; 023 024/** 025 * Process that reads from the monitor stream and updates counters. 026 * 027 * <p>Internally, uses a dedicated thread to process events. Events received 028 * from log4j are placed on a queue. This "Active object" or "Actor" pattern 029 * means that the data structures that hold counters do not need to be locked. 030 * 031 * <p>Command requests are treated like events. They place their result on 032 * a result queue.</p> 033 * 034 * <p>A {@link Visitor visitor} quickly dispatches events and commands 035 * to the appropriate handler method.</p> 036 * 037 * <p>The monitored objects form a hierarchy. For each object type, there is 038 * a mutable workspace (whose members are private and non-final) that is 039 * converted into a monitor object (whose members are public and final) when 040 * its {@code fix()} method is called:</p> 041 * 042 * <ul> 043 * <li>{@link MutableServerInfo} → {@link ServerInfo}</li> 044 * <ul> 045 * <li>{@link MutableConnectionInfo} → {@link ConnectionInfo}</li> 046 * <ul> 047 * <li>{@link MutableStatementInfo} 048 * → {@link StatementInfo}</li> 049 * <ul> 050 * <li>{@link MutableExecutionInfo} 051 * → {@link ExecutionInfo}</li> 052 * <ul> 053 * <li>{@link MutableSqlStatementInfo} 054 * → {@link SqlStatementInfo}</li> 055 * </ul> 056 * </ul> 057 * </ul> 058 * </ul> 059 * </ul> 060 */ 061class MonitorImpl 062 implements Monitor 063{ 064 private static final Logger LOGGER = Logger.getLogger(MonitorImpl.class); 065 private final Handler handler = new Handler(); 066 067 protected static final Util.MemoryInfo MEMORY_INFO = Util.getMemoryInfo(); 068 069 private static final Actor ACTOR = new Actor(); 070 071 static { 072 // Create and start thread for actor. 073 // 074 // Actor is shared between all servers. This reduces concurrency, but 075 // not a concern because monitoring events are not very numerous. 076 // We tried creating one actor (and therefore thread) per server, but 077 // some applications (and in particular some tests) create lots of 078 // servers. 079 // 080 // The actor is shut down with the JVM. 081 final Thread thread = new Thread(ACTOR, "Mondrian Monitor"); 082 thread.setDaemon(true); 083 thread.start(); 084 } 085 086 /** 087 * Creates a Monitor. 088 */ 089 public MonitorImpl() { 090 } 091 092 // Commands 093 094 public void shutdown() { 095 // Nothing to do. Cannot shut down actor, because it shared between 096 // all servers. 097 } 098 099 public void sendEvent(Event event) { 100 // The implementation does not need to take any locks. 101 try { 102 if (Thread.interrupted()) { 103 // Interrupt should not happen. Mondrian uses cancel without 104 // setting interrupt. But if interrupts are happening, it's 105 // best to know now, rather than failing next time we make a 106 // blocking system call. 107 throw new AssertionError(); 108 } 109 ACTOR.eventQueue.put(Pair.<Handler, Message>of(handler, event)); 110 } catch (InterruptedException e) { 111 throw Util.newError(e, "Exception while sending event " + event); 112 } 113 } 114 115 public ServerInfo getServer() { 116 return (ServerInfo) execute(new ServerCommand()); 117 } 118 119 public List<ConnectionInfo> getConnections() { 120 //noinspection unchecked 121 return (List<ConnectionInfo>) execute(new ConnectionsCommand()); 122 } 123 124 public List<StatementInfo> getStatements() { 125 //noinspection unchecked 126 return (List<StatementInfo>) execute(new StatementsCommand()); 127 } 128 129 public List<SqlStatementInfo> getSqlStatements() { 130 //noinspection unchecked 131 return (List<SqlStatementInfo>) execute(new SqlStatementsCommand()); 132 } 133 134 private Object execute(Command command) { 135 return ACTOR.execute(handler, command); 136 } 137 138 // Command and response classes 139 140 /** 141 * A kind of message that produces a response. The response may be null, 142 * but even so, it will be stored and the caller must collect it. 143 */ 144 static abstract class Command implements Message { 145 } 146 147 static class StatementsCommand extends Command { 148 public <T> T accept(Visitor<T> visitor) { 149 return ((CommandVisitor<T>) visitor).visit(this); 150 } 151 } 152 153 static class SqlStatementsCommand extends Command { 154 public <T> T accept(Visitor<T> visitor) { 155 return ((CommandVisitor<T>) visitor).visit(this); 156 } 157 } 158 159 static class ConnectionsCommand extends Command { 160 public <T> T accept(Visitor<T> visitor) { 161 return ((CommandVisitor<T>) visitor).visit(this); 162 } 163 } 164 165 static class ServerCommand extends Command { 166 public <T> T accept(Visitor<T> visitor) { 167 return ((CommandVisitor<T>) visitor).visit(this); 168 } 169 } 170 171 static class ShutdownCommand extends Command { 172 public <T> T accept(Visitor<T> visitor) { 173 return ((CommandVisitor<T>) visitor).visit(this); 174 } 175 } 176 177 /** 178 * Extension to {@link Visitor} to allow commands as well as events. 179 * 180 * @param <T> Return type 181 */ 182 static interface CommandVisitor<T> extends Visitor<T> { 183 T visit(ConnectionsCommand connectionsCommand); 184 T visit(ServerCommand serverCommand); 185 T visit(SqlStatementsCommand command); 186 T visit(StatementsCommand command); 187 T visit(ShutdownCommand command); 188 } 189 190 /** 191 * Workspace to collect statistics about the execution of a Mondrian server. 192 */ 193 private static class MutableServerInfo { 194 private final MutableSqlStatementInfo aggSql = 195 new MutableSqlStatementInfo(null, -1, null, null); 196 private final MutableExecutionInfo aggExec = 197 new MutableExecutionInfo(null, -1, null); 198 private final MutableStatementInfo aggStmt = 199 new MutableStatementInfo(null, -1, null); 200 private final MutableConnectionInfo aggConn = 201 new MutableConnectionInfo(null); 202 private final String stack; 203 204 public MutableServerInfo(String stack) { 205 this.stack = stack; 206 } 207 208 public ServerInfo fix() { 209 Util.MemoryInfo.Usage memoryUsage = MEMORY_INFO.get(); 210 return new ServerInfo( 211 stack, 212 aggConn.startCount, 213 aggConn.endCount, 214 aggStmt.startCount, 215 aggStmt.endCount, 216 aggSql.startCount, 217 aggSql.executeCount, 218 aggSql.endCount, 219 aggSql.rowFetchCount, 220 aggSql.executeNanos, 221 aggSql.cellRequestCount, 222 aggExec.cellCacheHitCount, 223 aggExec.cellCacheRequestCount, 224 aggExec.cellCacheMissCount, 225 aggExec.cellCachePendingCount, 226 aggExec.startCount, 227 aggExec.endCount, 228 memoryUsage.getUsed(), 229 memoryUsage.getCommitted(), 230 memoryUsage.getMax(), 231 (aggExec.cellCacheSegmentCreateCount 232 - aggExec.cellCacheSegmentDeleteCount), 233 aggExec.cellCacheSegmentCreateCount, 234 aggExec.cellCacheSegmentCreateViaExternalCount, 235 aggExec.cellCacheSegmentDeleteViaExternalCount, 236 aggExec.cellCacheSegmentCreateViaRollupCount, 237 aggExec.cellCacheSegmentCreateViaSqlCount, 238 aggExec.cellCacheSegmentCellCount, 239 aggExec.cellCacheSegmentCoordinateSum); 240 } 241 } 242 243 /** 244 * Workspace to collect statistics about the execution of a Mondrian MDX 245 * statement. Parent context is the server. 246 */ 247 private static class MutableConnectionInfo { 248 private final MutableExecutionInfo aggExec = 249 new MutableExecutionInfo(null, -1, null); 250 private final MutableStatementInfo aggStmt = 251 new MutableStatementInfo(null, -1, null); 252 private int startCount; 253 private int endCount; 254 private final String stack; 255 256 public MutableConnectionInfo(String stack) { 257 this.stack = stack; 258 } 259 260 public ConnectionInfo fix() { 261 return new ConnectionInfo( 262 stack, 263 aggExec.cellCacheHitCount, 264 aggExec.cellCacheRequestCount, 265 aggExec.cellCacheMissCount, 266 aggExec.cellCachePendingCount, 267 aggStmt.startCount, 268 aggStmt.endCount, 269 aggExec.startCount, 270 aggExec.endCount); 271 } 272 } 273 274 /** 275 * Workspace to collect statistics about the execution of a Mondrian MDX 276 * statement. Parent context is the connection. 277 */ 278 private static class MutableStatementInfo { 279 private final MutableConnectionInfo conn; 280 private final long statementId; 281 private final MutableExecutionInfo aggExec = 282 new MutableExecutionInfo(null, -1, null); 283 private final MutableSqlStatementInfo aggSql = 284 new MutableSqlStatementInfo(null, -1, null, null); 285 private int startCount; 286 private int endCount; 287 private final String stack; 288 289 public MutableStatementInfo( 290 MutableConnectionInfo conn, 291 long statementId, 292 String stack) 293 { 294 this.statementId = statementId; 295 this.conn = conn; 296 this.stack = stack; 297 } 298 299 public StatementInfo fix() { 300 return new StatementInfo( 301 stack, 302 statementId, 303 aggExec.startCount, 304 aggExec.endCount, 305 aggExec.phaseCount, 306 aggExec.cellCacheRequestCount, 307 aggExec.cellCacheHitCount, 308 aggExec.cellCacheMissCount, 309 aggExec.cellCachePendingCount, 310 aggSql.startCount, 311 aggSql.executeCount, 312 aggSql.endCount, 313 aggSql.rowFetchCount, 314 aggSql.executeNanos, 315 aggSql.cellRequestCount); 316 } 317 } 318 319 /** 320 * <p>Workspace to collect statistics about the execution of a Mondrian MDX 321 * statement. A statement execution occurs within the context of a 322 * statement.</p> 323 * 324 * <p>Most statements are executed only once. It is possible 325 * (if you use the {@link org.olap4j.PreparedOlapStatement} API for 326 * instance) to execute a statement more than once. There can be at most 327 * one execution at a time for a given statement. Thus a statement's 328 * executeStartCount and executeEndCount should never differ by more than 329 * 1.</p> 330 */ 331 private static class MutableExecutionInfo { 332 private final MutableStatementInfo stmt; 333 private final long executionId; 334 private final MutableSqlStatementInfo aggSql = 335 new MutableSqlStatementInfo(null, -1, null, null); 336 private int startCount; 337 private int phaseCount; 338 private int endCount; 339 private int cellCacheRequestCount; 340 private int cellCacheHitCount; 341 private int cellCacheMissCount; 342 private int cellCachePendingCount; 343 private int cellCacheHitCountDelta; 344 private int cellCacheMissCountDelta; 345 private int cellCachePendingCountDelta; 346 private int cellCacheSegmentCreateCount; 347 private int cellCacheSegmentCreateViaRollupCount; 348 private int cellCacheSegmentCreateViaSqlCount; 349 private int cellCacheSegmentCreateViaExternalCount; 350 private int cellCacheSegmentDeleteViaExternalCount; 351 private int cellCacheSegmentDeleteCount; 352 private int cellCacheSegmentCoordinateSum; 353 private int cellCacheSegmentCellCount; 354 private final String stack; 355 356 public MutableExecutionInfo( 357 MutableStatementInfo stmt, 358 long executionId, 359 String stack) 360 { 361 this.stmt = stmt; 362 this.executionId = executionId; 363 this.stack = stack; 364 } 365 366 public ExecutionInfo fix() { 367 return new ExecutionInfo( 368 stack, 369 executionId, 370 phaseCount, 371 cellCacheRequestCount, 372 cellCacheHitCount, 373 cellCacheMissCount, 374 cellCachePendingCount, 375 aggSql.startCount, 376 aggSql.executeCount, 377 aggSql.endCount, 378 aggSql.rowFetchCount, 379 aggSql.executeNanos, 380 aggSql.cellRequestCount); 381 } 382 } 383 384 /** 385 * Workspace to collect statistics about the execution of a SQL statement. 386 * A SQL statement execution occurs within the context of a Mondrian MDX 387 * statement. 388 */ 389 private static class MutableSqlStatementInfo { 390 private final MutableStatementInfo stmt; // parent context 391 private final long sqlStatementId; 392 private int startCount; 393 private int executeCount; 394 private int endCount; 395 private int cellRequestCount; 396 private long executeNanos; 397 private long rowFetchCount; 398 private final String stack; 399 private final String sql; 400 401 public MutableSqlStatementInfo( 402 MutableStatementInfo stmt, 403 long sqlStatementId, 404 String sql, 405 String stack) 406 { 407 this.sqlStatementId = sqlStatementId; 408 this.stmt = stmt; 409 this.sql = sql; 410 this.stack = stack; 411 } 412 413 public SqlStatementInfo fix() { 414 return new SqlStatementInfo( 415 stack, 416 sqlStatementId, 417 sql); 418 } 419 } 420 421 private static class Handler implements CommandVisitor<Object> { 422 423 private final MutableServerInfo server = 424 new MutableServerInfo(null); 425 426 private final Map<Integer, MutableConnectionInfo> connectionMap = 427 new LinkedHashMap<Integer, MutableConnectionInfo>( 428 MondrianProperties.instance().ExecutionHistorySize.get(), 429 0.8f, 430 false) 431 { 432 private final int maxSize = 433 MondrianProperties.instance().ExecutionHistorySize.get(); 434 private static final long serialVersionUID = 1L; 435 protected boolean removeEldestEntry( 436 Map.Entry<Integer, MutableConnectionInfo> e) 437 { 438 if (size() > maxSize) { 439 if (RolapUtil.MONITOR_LOGGER.isTraceEnabled()) { 440 RolapUtil.MONITOR_LOGGER.trace( 441 "ConnectionInfo(" 442 + e.getKey() 443 + ") evicted. Stack is:" 444 + Util.nl 445 + e.getValue().stack); 446 } 447 return true; 448 } 449 return false; 450 } 451 }; 452 453 private final Map<Long, MutableSqlStatementInfo> sqlStatementMap = 454 new LinkedHashMap<Long, MutableSqlStatementInfo>( 455 MondrianProperties.instance().ExecutionHistorySize.get(), 456 0.8f, 457 false) 458 { 459 private final int maxSize = 460 MondrianProperties.instance().ExecutionHistorySize.get(); 461 private static final long serialVersionUID = 1L; 462 protected boolean removeEldestEntry( 463 Map.Entry<Long, MutableSqlStatementInfo> e) 464 { 465 if (size() > maxSize) { 466 if (RolapUtil.MONITOR_LOGGER.isTraceEnabled()) { 467 RolapUtil.MONITOR_LOGGER.trace( 468 "StatementInfo(" 469 + e.getKey() 470 + ") evicted. Stack is:" 471 + Util.nl 472 + e.getValue().stack); 473 } 474 return true; 475 } 476 return false; 477 } 478 }; 479 480 private final Map<Long, MutableStatementInfo> statementMap = 481 new LinkedHashMap<Long, MutableStatementInfo>( 482 MondrianProperties.instance().ExecutionHistorySize.get(), 483 0.8f, 484 false) 485 { 486 private final int maxSize = 487 MondrianProperties.instance().ExecutionHistorySize.get(); 488 private static final long serialVersionUID = 1L; 489 protected boolean removeEldestEntry( 490 Map.Entry<Long, MutableStatementInfo> e) 491 { 492 if (size() > maxSize) { 493 if (RolapUtil.MONITOR_LOGGER.isTraceEnabled()) { 494 RolapUtil.MONITOR_LOGGER.trace( 495 "StatementInfo(" 496 + e.getKey() 497 + ") evicted. Stack is:" 498 + Util.nl 499 + e.getValue().stack); 500 } 501 return true; 502 } 503 return false; 504 } 505 }; 506 507 private final Map<Long, MutableExecutionInfo> executionMap = 508 new LinkedHashMap<Long, MutableExecutionInfo>( 509 MondrianProperties.instance().ExecutionHistorySize.get(), 510 0.8f, 511 false) 512 { 513 private final int maxSize = 514 MondrianProperties.instance().ExecutionHistorySize.get(); 515 private static final long serialVersionUID = 1L; 516 protected boolean removeEldestEntry( 517 Map.Entry<Long, MutableExecutionInfo> e) 518 { 519 if (size() > maxSize) { 520 if (RolapUtil.MONITOR_LOGGER.isTraceEnabled()) { 521 RolapUtil.MONITOR_LOGGER.trace( 522 "ExecutionInfo(" 523 + e.getKey() 524 + ") evicted. Stack is:" 525 + Util.nl 526 + e.getValue().stack); 527 } 528 return true; 529 } 530 return false; 531 } 532 }; 533 534 /** 535 * Holds info for executions that have ended. Cell cache events may 536 * arrive late, and this map lets them get into the system. 537 */ 538 private final Map<Long, MutableExecutionInfo> retiredExecutionMap = 539 new LinkedHashMap<Long, MutableExecutionInfo>( 540 MondrianProperties.instance().ExecutionHistorySize.get(), 541 0.8f, 542 false) 543 { 544 private final int maxSize = 545 MondrianProperties.instance().ExecutionHistorySize.get(); 546 private static final long serialVersionUID = 1L; 547 protected boolean removeEldestEntry( 548 Map.Entry<Long, MutableExecutionInfo> e) 549 { 550 if (size() > maxSize) { 551 if (RolapUtil.MONITOR_LOGGER.isTraceEnabled()) { 552 RolapUtil.MONITOR_LOGGER.trace( 553 "Retired ExecutionInfo(" 554 + e.getKey() 555 + ") evicted. Stack is:" 556 + Util.nl 557 + e.getValue().stack); 558 } 559 return true; 560 } 561 return false; 562 } 563 }; 564 565 /** 566 * Method for debugging that does nothing, but is a place to put a break 567 * point to find out places where an event or its parent should be 568 * registered but is not. 569 * 570 * @param event Event 571 * @return Always null 572 */ 573 private Object missing(Event event) { 574 return null; 575 } 576 577 public Object visit(ConnectionStartEvent event) { 578 final MutableConnectionInfo conn = 579 new MutableConnectionInfo(event.stack); 580 connectionMap.put(event.connectionId, conn); 581 foo(conn, event); 582 foo(server.aggConn, event); 583 if (RolapUtil.MONITOR_LOGGER.isTraceEnabled()) { 584 RolapUtil.MONITOR_LOGGER.trace( 585 "Connection(" 586 + event.connectionId 587 + ") created. stack is:" 588 + Util.nl 589 + event.stack); 590 } 591 return null; 592 } 593 594 private void foo( 595 MutableConnectionInfo conn, 596 ConnectionStartEvent event) 597 { 598 ++conn.startCount; 599 } 600 601 public Object visit(ConnectionEndEvent event) { 602 final MutableConnectionInfo conn = 603 connectionMap.remove(event.connectionId); 604 if (conn == null) { 605 return missing(event); 606 } 607 foo(conn, event); 608 foo(server.aggConn, event); 609 610 // Since the connection info will no longer be in the table, 611 // broadcast the final info to anyone who is interested. 612 RolapUtil.MONITOR_LOGGER.debug(conn.fix()); 613 return null; 614 } 615 616 private void foo( 617 MutableConnectionInfo conn, 618 ConnectionEndEvent event) 619 { 620 ++conn.endCount; 621 } 622 623 public Object visit(StatementStartEvent event) { 624 final MutableConnectionInfo conn = 625 connectionMap.get(event.connectionId); 626 if (conn == null) { 627 return missing(event); 628 } 629 final MutableStatementInfo stmt = 630 new MutableStatementInfo( 631 conn, event.statementId, event.stack); 632 statementMap.put(event.statementId, stmt); 633 foo(stmt, event); 634 foo(conn.aggStmt, event); 635 foo(server.aggStmt, event); 636 if (RolapUtil.MONITOR_LOGGER.isTraceEnabled()) { 637 RolapUtil.MONITOR_LOGGER.trace( 638 "Statement(" 639 + event.statementId 640 + ") created. stack is:" 641 + Util.nl 642 + event.stack); 643 } 644 return null; 645 } 646 647 private void foo( 648 MutableStatementInfo stmt, 649 StatementStartEvent event) 650 { 651 ++stmt.startCount; 652 } 653 654 public Object visit(StatementEndEvent event) { 655 final MutableStatementInfo stmt = 656 statementMap.remove(event.statementId); 657 if (stmt == null) { 658 return missing(event); 659 } 660 foo(stmt, event); 661 foo(stmt.conn.aggStmt, event); 662 foo(server.aggStmt, event); 663 664 // Since the statement info will no longer be in the table, 665 // broadcast the final info to anyone who is interested. 666 RolapUtil.MONITOR_LOGGER.debug(stmt.fix()); 667 return null; 668 } 669 670 private void foo( 671 MutableStatementInfo stmt, 672 StatementEndEvent event) 673 { 674 ++stmt.endCount; 675 } 676 677 public Object visit(ExecutionStartEvent event) { 678 MutableStatementInfo stmt = 679 statementMap.get(event.statementId); 680 if (stmt == null) { 681 return missing(event); 682 } 683 final MutableExecutionInfo exec = 684 new MutableExecutionInfo( 685 stmt, event.executionId, event.stack); 686 executionMap.put(event.executionId, exec); 687 688 foo(exec, event); 689 foo(stmt.aggExec, event); 690 foo(stmt.conn.aggExec, event); 691 foo(server.aggExec, event); 692 if (RolapUtil.MONITOR_LOGGER.isTraceEnabled()) { 693 RolapUtil.MONITOR_LOGGER.trace( 694 "Execution(" 695 + event.executionId 696 + ") created. stack is:" 697 + Util.nl 698 + event.stack); 699 } 700 return null; 701 } 702 703 private void foo( 704 MutableExecutionInfo exec, 705 ExecutionStartEvent event) 706 { 707 ++exec.startCount; 708 } 709 710 public Object visit(ExecutionPhaseEvent event) { 711 final MutableExecutionInfo exec = 712 executionMap.get(event.executionId); 713 if (exec == null) { 714 return missing(event); 715 } 716 executionMap.put(event.executionId, exec); 717 718 foo(exec, event); 719 foo(exec.stmt.aggExec, event); 720 foo(exec.stmt.conn.aggExec, event); 721 foo(server.aggExec, event); 722 return null; 723 } 724 725 private void foo( 726 MutableExecutionInfo exec, 727 ExecutionPhaseEvent event) 728 { 729 ++exec.phaseCount; 730 exec.cellCacheHitCountDelta = event.hitCount; 731 exec.cellCacheMissCountDelta = event.missCount; 732 exec.cellCachePendingCountDelta = event.pendingCount; 733 } 734 735 public Object visit(ExecutionEndEvent event) { 736 final MutableExecutionInfo exec = 737 executionMap.remove(event.executionId); 738 if (exec == null) { 739 return missing(event); 740 } 741 retiredExecutionMap.put(exec.executionId, exec); 742 foo(exec, event); 743 foo(exec.stmt.aggExec, event); 744 foo(exec.stmt.conn.aggExec, event); 745 foo(server.aggExec, event); 746 747 // Since the execution info will no longer be in the table, 748 // broadcast the final info to anyone who is interested. 749 RolapUtil.MONITOR_LOGGER.debug(exec.fix()); 750 return null; 751 } 752 753 private void foo( 754 MutableExecutionInfo exec, 755 ExecutionEndEvent event) 756 { 757 // NOTE: 'exec.phaseCount += event.phaseCount' would be wrong, 758 // because we have already incremented each time we got an 759 // ExecutionPhaseEvent. For a similar reason, we do not update 760 // exec.cellCacheHitCount etc. each phase. 761 762 ++exec.endCount; 763 ++exec.phaseCount; 764 exec.cellCacheHitCount += event.cellCacheHitCount; 765 exec.cellCacheMissCount += event.cellCacheMissCount; 766 exec.cellCachePendingCount += event.cellCachePendingCount; 767 exec.cellCacheRequestCount += 768 (event.cellCacheHitCount 769 + event.cellCacheMissCount 770 + event.cellCachePendingCount); 771 exec.cellCacheHitCountDelta = 0; 772 exec.cellCacheMissCountDelta = 0; 773 exec.cellCachePendingCountDelta = 0; 774 } 775 776 public Object visit(CellCacheSegmentCreateEvent event) { 777 MutableExecutionInfo exec = 778 executionMap.get(event.executionId); 779 if (exec == null) { 780 // Cache events can sometimes arrive after the execution has 781 // ended. So, look into the retired map. 782 exec = retiredExecutionMap.get(event.executionId); 783 if (exec == null) { 784 return missing(event); 785 } 786 } 787 788 foo(exec, event); 789 foo(exec.stmt.aggExec, event); 790 foo(exec.stmt.conn.aggExec, event); 791 foo(server.aggExec, event); 792 return null; 793 } 794 795 private void foo( 796 MutableExecutionInfo exec, 797 CellCacheSegmentCreateEvent event) 798 { 799 ++exec.cellCacheSegmentCreateCount; 800 exec.cellCacheSegmentCoordinateSum += event.coordinateCount; 801 exec.cellCacheSegmentCellCount += event.actualCellCount; 802 switch (event.source) { 803 case ROLLUP: 804 ++exec.cellCacheSegmentCreateViaRollupCount; 805 break; 806 case EXTERNAL: 807 ++exec.cellCacheSegmentCreateViaExternalCount; 808 break; 809 case SQL: 810 ++exec.cellCacheSegmentCreateViaSqlCount; 811 break; 812 default: 813 throw Util.unexpected(event.source); 814 } 815 } 816 817 public Object visit(CellCacheSegmentDeleteEvent event) { 818 final MutableExecutionInfo exec = 819 executionMap.get(event.executionId); 820 if (exec == null) { 821 return missing(event); 822 } 823 824 foo(exec, event); 825 foo(exec.stmt.aggExec, event); 826 foo(exec.stmt.conn.aggExec, event); 827 foo(server.aggExec, event); 828 return null; 829 } 830 831 private void foo( 832 MutableExecutionInfo exec, 833 CellCacheSegmentDeleteEvent event) 834 { 835 ++exec.cellCacheSegmentDeleteCount; 836 exec.cellCacheSegmentCoordinateSum -= event.coordinateCount; 837 switch (event.source) { 838 case EXTERNAL: 839 ++exec.cellCacheSegmentDeleteViaExternalCount; 840 break; 841 } 842 } 843 844 public Object visit(SqlStatementStartEvent event) { 845 final MutableStatementInfo stmt = 846 statementMap.get( 847 event.getStatementId()); 848 if (stmt == null) { 849 return missing(event); 850 } 851 final MutableSqlStatementInfo sql = 852 new MutableSqlStatementInfo( 853 stmt, 854 event.sqlStatementId, 855 event.sql, 856 event.stack); 857 sqlStatementMap.put(event.sqlStatementId, sql); 858 foo(sql, event); 859 foo(sql.stmt.aggSql, event); 860 foo(server.aggSql, event); 861 if (RolapUtil.MONITOR_LOGGER.isTraceEnabled()) { 862 RolapUtil.MONITOR_LOGGER.trace( 863 "SqlStatement(" 864 + event.sqlStatementId 865 + ") created. stack is:" 866 + Util.nl 867 + event.stack); 868 } 869 return null; 870 } 871 872 private void foo( 873 MutableSqlStatementInfo sql, 874 SqlStatementStartEvent event) 875 { 876 ++sql.startCount; 877 sql.cellRequestCount += event.cellRequestCount; 878 } 879 880 public Object visit(SqlStatementExecuteEvent event) { 881 final MutableSqlStatementInfo sql = 882 sqlStatementMap.get(event.sqlStatementId); 883 if (sql == null) { 884 return missing(event); 885 } 886 foo(sql, event); 887 foo(sql.stmt.aggSql, event); 888 foo(server.aggSql, event); 889 return null; 890 } 891 892 private void foo( 893 MutableSqlStatementInfo sql, 894 SqlStatementExecuteEvent event) 895 { 896 ++sql.executeCount; 897 sql.executeNanos += event.executeNanos; 898 } 899 900 public Object visit(SqlStatementEndEvent event) { 901 final MutableSqlStatementInfo sql = 902 sqlStatementMap.remove(event.sqlStatementId); 903 if (sql == null) { 904 return missing(event); 905 } 906 foo(sql, event); 907 foo(sql.stmt.aggSql, event); 908 foo(server.aggSql, event); 909 910 // Since the SQL statement info will no longer be in the table, 911 // broadcast the final info to anyone who is interested. 912 RolapUtil.MONITOR_LOGGER.debug(sql.fix()); 913 return null; 914 } 915 916 private void foo( 917 MutableSqlStatementInfo sql, 918 SqlStatementEndEvent event) 919 { 920 ++sql.endCount; 921 sql.rowFetchCount += event.rowFetchCount; 922 } 923 924 public Object visit(ConnectionsCommand connectionsCommand) { 925 List<ConnectionInfo> list = 926 new ArrayList<ConnectionInfo>(); 927 for (MutableConnectionInfo info : connectionMap.values()) { 928 list.add(info.fix()); 929 } 930 return list; 931 } 932 933 public Object visit(ServerCommand serverCommand) { 934 return server.fix(); 935 } 936 937 public Object visit(SqlStatementsCommand command) { 938 List<SqlStatementInfo> list = 939 new ArrayList<SqlStatementInfo>(); 940 for (MutableSqlStatementInfo info : sqlStatementMap.values()) { 941 list.add(info.fix()); 942 } 943 return list; 944 } 945 946 public Object visit(StatementsCommand command) { 947 List<StatementInfo> list = 948 new ArrayList<StatementInfo>(); 949 for (MutableStatementInfo info : statementMap.values()) { 950 list.add(info.fix()); 951 } 952 return list; 953 } 954 955 public Object visit(ShutdownCommand command) { 956 return "Shutdown succeeded"; 957 } 958 } 959 960 private static class Actor implements Runnable { 961 private boolean running = true; 962 963 private final BlockingQueue<Pair<Handler, Message>> eventQueue = 964 new ArrayBlockingQueue<Pair<Handler, Message>>(1000); 965 966 private final BlockingHashMap<Command, Object> responseMap = 967 new BlockingHashMap<Command, Object>(1000); 968 969 public void run() { 970 try { 971 for (;;) { 972 try { 973 final Pair<Handler, Message> entry = eventQueue.take(); 974 final Handler handler = entry.left; 975 final Message message = entry.right; 976 final Object result = message.accept(handler); 977 if (message instanceof Command) { 978 responseMap.put((Command) message, result); 979 } else { 980 // Broadcast the event to anyone who is interested. 981 RolapUtil.MONITOR_LOGGER.debug(message); 982 } 983 if (message instanceof ShutdownCommand) { 984 LOGGER.debug( 985 "ShutdownCommand received. Monitor thread is shutting down."); 986 return; 987 } 988 } catch (InterruptedException e) { 989 Thread.currentThread().interrupt(); 990 LOGGER.warn( 991 "Monitor thread interrupted.", 992 e); 993 return; 994 } catch (Throwable t) { 995 LOGGER.error( 996 "Runtime error on the monitor thread.", 997 t); 998 } 999 } 1000 } finally { 1001 running = false; 1002 } 1003 } 1004 1005 public void shutdown() { 1006 // No point sending a command if (for some reason) there's no thread 1007 // listening to the command queue. 1008 if (running) { 1009 execute(null, new ShutdownCommand()); 1010 } 1011 } 1012 1013 Object execute(Handler handler, Command command) { 1014 try { 1015 eventQueue.put(Pair.<Handler, Message>of(handler, command)); 1016 } catch (InterruptedException e) { 1017 throw Util.newError(e, "Interrupted while sending " + command); 1018 } 1019 try { 1020 return responseMap.get(command); 1021 } catch (InterruptedException e) { 1022 throw Util.newError(e, "Interrupted while awaiting " + command); 1023 } 1024 } 1025 } 1026} 1027 1028// End MonitorImpl.java