001/* 002// This software is subject to the terms of the Eclipse Public License v1.0 003// Agreement, available at the following URL: 004// http://www.eclipse.org/legal/epl-v10.html. 005// You must accept the terms of that agreement to use this software. 006// 007// Copyright (C) 2011-2013 Pentaho 008// All Rights Reserved. 009*/ 010package mondrian.rolap.agg; 011 012import mondrian.olap.*; 013import mondrian.olap.CacheControl.CellRegion; 014import mondrian.resource.MondrianResource; 015import mondrian.rolap.*; 016import mondrian.rolap.cache.*; 017import mondrian.server.Execution; 018import mondrian.server.Locus; 019import mondrian.server.monitor.*; 020import mondrian.spi.*; 021import mondrian.util.*; 022 023import org.apache.log4j.Logger; 024 025import java.io.PrintWriter; 026import java.util.*; 027import java.util.Map.Entry; 028import java.util.concurrent.*; 029 030/** 031 * Active object that maintains the "global cache" (in JVM, but shared between 032 * connections using a particular schema) and "external cache" (as implemented 033 * by a {@link mondrian.spi.SegmentCache}. 034 * 035 * <p>Segment states</p> 036 * 037 * <table> 038 * <tr><th>State</th><th>Meaning</th></tr> 039 * <tr><td>Local</td><td>Initial state of a segment</td></tr> 040 * </table> 041 * 042 * <h2>Decisions to be reviewed</h2> 043 * 044 * <p>1. Create variant of actor that processes all requests synchronously, 045 * and does not need a thread. This would be a more 'embedded' mode of operation 046 * (albeit with worse scale-out).</p> 047 * 048 * <p>2. Move functionality into AggregationManager?</p> 049 * 050 * <p>3. Delete {@link mondrian.rolap.RolapStar#lookupOrCreateAggregation} 051 * and {@link mondrian.rolap.RolapStar#lookupSegment} 052 * and {@link mondrian.rolap.RolapStar}.lookupAggregationShared 053 * (formerly RolapStar.lookupAggregation).</p> 054 * 055 * 056 * 057 * 058 * <h2>Moved methods</h2> 059 * 060 * <p>(Keeping track of where methods came from will make it easier to merge 061 * to the mondrian-4 code line.)</p> 062 * 063 * <p>1. {@link mondrian.rolap.RolapStar#getCellFromCache} moved from 064 * {@link Aggregation}.getCellValue</p> 065 * 066 * 067 * 068 * <h2>Done</h2> 069 * 070 * <p>1. Obsolete CountingAggregationManager, and property 071 * mondrian.rolap.agg.enableCacheHitCounters.</p> 072 * 073 * <p>2. AggregationManager becomes non-singleton.</p> 074 * 075 * <p>3. SegmentCacheWorker methods and segmentCache field become 076 * non-static. initCache() is called on construction. SegmentCache is passed 077 * into constructor (therefore move ServiceDiscovery into 078 * client). AggregationManager (or maybe MondrianServer) is another constructor 079 * parameter.</p> 080 * 081 * <p>5. Move SegmentHeader, SegmentBody, ConstrainedColumn into 082 * mondrian.spi. Leave behind dependencies on mondrian.rolap.agg. In particular, 083 * put code that converts Segment + SegmentWithData to and from SegmentHeader 084 * + SegmentBody (e.g. {@link SegmentHeader}#forSegment) into a utility class. 085 * (Do this as CLEANUP, after functionality is complete?)</p> 086 * 087 * <p>6. Move functionality Aggregation to Segment. Long-term, Aggregation 088 * should not be used as a 'gatekeeper' to Segment. Remove Aggregation fields 089 * columns and axes.</p> 090 * 091 * <p>9. Obsolete {@link RolapStar#cacheAggregations}. Similar effect will be 092 * achieved by removing the 'jvm cache' from the chain of caches.</p> 093 * 094 * <p>10. Rename Aggregation.Axis to SegmentAxis.</p> 095 * 096 * <p>11. Remove Segment.setData and instead split out subclass 097 * SegmentWithData. Now segment is immutable. You don't have to wait for its 098 * state to change. You wait for a Future<SegmentWithData> to become 099 * ready.</p> 100 * 101 * <p>12. Remove methods: RolapCube.checkAggregateModifications, 102 * RolapStar.checkAggregateModifications, 103 * RolapSchema.checkAggregateModifications, 104 * RolapStar.pushAggregateModificationsToGlobalCache, 105 * RolapSchema.pushAggregateModificationsToGlobalCache, 106 * RolapCube.pushAggregateModificationsToGlobalCache.</p> 107 * 108 * <p>13. Add new implementations of Future: CompletedFuture and SlotFuture.</p> 109 * 110 * <p>14. Remove methods:<p> 111 * <ul> 112 * 113 * <li>Remove {@link SegmentLoader}.loadSegmentsFromCache - creates a 114 * {@link SegmentHeader} that has PRECISELY same specification as the 115 * requested segment, very unlikely to have a hit</li> 116 * 117 * <li>Remove {@link SegmentLoader}.loadSegmentFromCacheRollup</li> 118 * 119 * <li>Break up {@link SegmentLoader}.cacheSegmentData, and 120 * place code that is called after a segment has arrived</li> 121 * 122 * </ul> 123 * 124 * <p>13. Fix flush. Obsolete {@link Aggregation}.flush, and 125 * {@link RolapStar}.flush, which called it.</p> 126 * 127 * <p>18. {@code SegmentCacheManager#locateHeaderBody} (and maybe other 128 * methods) call {@link SegmentCacheWorker#get}, and that's a slow blocking 129 * call. Make waits for segment futures should be called from a worker or 130 * client, not an agent.</p> 131 * 132 * 133 * <h2>Ideas and tasks</h2> 134 * 135 * <p>7. RolapStar.localAggregations and .sharedAggregations. Obsolete 136 * sharedAggregations.</p> 137 * 138 * <p>8. Longer term. Move {@link mondrian.rolap.RolapStar.Bar}.segmentRefs to 139 * {@link mondrian.server.Execution}. Would it still be thread-local?</p> 140 * 141 * <p>10. Call 142 * {@link mondrian.spi.DataSourceChangeListener#isAggregationChanged}. 143 * Previously called from 144 * {@link RolapStar}.checkAggregateModifications, now never called.</p> 145 * 146 * <p>12. We can quickly identify segments affected by a flush using 147 * {@link SegmentCacheIndex#intersectRegion}. But then what? Options:</p> 148 * 149 * <ol> 150 * 151 * <li>Option #1. Pull them in, trim them, write them out? But: causes 152 * a lot of I/O, and we may never use these 153 * segments. Easiest.</li> 154 * 155 * <li>Option #2. Mark the segments in the index as needing to be trimmed; trim 156 * them when read, and write out again. But: doesn't propagate to other 157 * nodes.</li> 158 * 159 * <li>Option #3. (Best?) Write a mapping SegmentHeader->Restrictions into the 160 * cache. Less I/O than #1. Method 161 * "SegmentCache.addRestriction(SegmentHeader, CacheRegion)"</li> 162 * 163 * </ol> 164 * 165 * <p>14. Move {@link AggregationManager#getCellFromCache} somewhere else. 166 * It's concerned with local segments, not the global/external cache.</p> 167 * 168 * <p>15. Method to convert SegmentHeader + SegmentBody to Segment + 169 * SegmentWithData is imperfect. Cannot parse predicates, compound predicates. 170 * Need mapping in star to do it properly and efficiently? 171 * {@link mondrian.rolap.agg.SegmentBuilder.SegmentConverter} is a hack that 172 * can be removed when this is fixed. 173 * See {@link SegmentBuilder#toSegment}. Also see #20.</p> 174 * 175 * <p>17. Revisit the strategy for finding segments that can be copied from 176 * global and external cache into local cache. The strategy of sending N 177 * {@link CellRequest}s at a time, then executing SQL to fill in the gaps, is 178 * flawed. We need to maximize N in order to reduce segment fragmentation, but 179 * if too high, we blow memory. BasicQueryTest.testAnalysis is an example of 180 * this. Instead, we should send cell-requests in batches (is ~1000 the right 181 * size?), identify those that can be answered from global or external cache, 182 * return those segments, but not execute SQL until the end of the phase. 183 * If so, {@link CellRequestQuantumExceededException} be obsoleted.</p> 184 * 185 * <p>19. Tracing. 186 * a. Remove or re-purpose {@link FastBatchingCellReader#pendingCount}; 187 * b. Add counter to measure requests satisfied by calling 188 * {@link mondrian.rolap.agg.SegmentCacheManager#peek}.</p> 189 * 190 * <p>20. Obsolete {@link SegmentDataset} and its implementing classes. 191 * {@link SegmentWithData} can use {@link SegmentBody} instead. Will save 192 * copying.</p> 193 * 194 * <p>21. Obsolete {@link mondrian.util.CombiningGenerator}.</p> 195 * 196 * <p>22. {@link SegmentHeader#constrain(mondrian.spi.SegmentColumn[])} is 197 * broken for N-dimensional regions where N > 1. Each call currently 198 * creates N more 1-dimensional regions, but should create 1 more N-dimensional 199 * region. {@link SegmentHeader#excludedRegions} should be a list of 200 * {@link SegmentColumn} arrays.</p> 201 * 202 * <p>23. All code that calls {@link Future#get} should probably handle 203 * {@link CancellationException}.</p> 204 * 205 * <p>24. Obsolete {@link #handler}. Indirection doesn't win anything.</p> 206 * 207 * 208 * @author jhyde 209 */ 210public class SegmentCacheManager { 211 private final Handler handler = new Handler(); 212 private final Actor ACTOR; 213 public final Thread thread; 214 215 /** 216 * Executor with which to send requests to external caches. 217 */ 218 public final ExecutorService cacheExecutor = 219 Util.getExecutorService( 220 MondrianProperties.instance() 221 .SegmentCacheManagerNumberCacheThreads.get(), 222 0, 1, 223 "mondrian.rolap.agg.SegmentCacheManager$cacheExecutor", 224 new RejectedExecutionHandler() { 225 public void rejectedExecution( 226 Runnable r, 227 ThreadPoolExecutor executor) 228 { 229 throw MondrianResource.instance() 230 .SegmentCacheLimitReached.ex(); 231 } 232 }); 233 234 /** 235 * Executor with which to execute SQL requests. 236 * 237 * <p>TODO: create using factory and/or configuration parameters. Executor 238 * should be shared within MondrianServer or target JDBC database. 239 */ 240 public final ExecutorService sqlExecutor = 241 Util.getExecutorService( 242 MondrianProperties.instance() 243 .SegmentCacheManagerNumberSqlThreads.get(), 244 0, 1, 245 "mondrian.rolap.agg.SegmentCacheManager$sqlExecutor", 246 new RejectedExecutionHandler() { 247 public void rejectedExecution( 248 Runnable r, 249 ThreadPoolExecutor executor) 250 { 251 throw MondrianResource.instance() 252 .SqlQueryLimitReached.ex(); 253 } 254 }); 255 256 // NOTE: This list is only mutable for testing purposes. Would rather it 257 // were immutable. 258 public final List<SegmentCacheWorker> segmentCacheWorkers = 259 new CopyOnWriteArrayList<SegmentCacheWorker>(); 260 261 public final SegmentCache compositeCache; 262 private final SegmentCacheIndexRegistry indexRegistry; 263 264 private static final Logger LOGGER = 265 Logger.getLogger(AggregationManager.class); 266 private final MondrianServer server; 267 268 public SegmentCacheManager(MondrianServer server) { 269 this.server = server; 270 ACTOR = new Actor(); 271 thread = new Thread( 272 ACTOR, "mondrian.rolap.agg.SegmentCacheManager$ACTOR"); 273 thread.setDaemon(true); 274 thread.start(); 275 276 // Create the index registry. 277 this.indexRegistry = new SegmentCacheIndexRegistry(); 278 279 // Add a local cache, if needed. 280 if (!MondrianProperties.instance().DisableLocalSegmentCache.get() 281 && !MondrianProperties.instance().DisableCaching.get()) 282 { 283 final MemorySegmentCache cache = new MemorySegmentCache(); 284 segmentCacheWorkers.add( 285 new SegmentCacheWorker(cache, thread)); 286 } 287 288 // Add an external cache, if configured. 289 final List<SegmentCache> externalCache = SegmentCacheWorker.initCache(); 290 for (SegmentCache cache : externalCache) { 291 // Create a worker for this external cache 292 segmentCacheWorkers.add( 293 new SegmentCacheWorker(cache, thread)); 294 // Hook up a listener so it can update 295 // the segment index. 296 cache.addListener( 297 new AsyncCacheListener(this, server)); 298 } 299 300 compositeCache = new CompositeSegmentCache(segmentCacheWorkers); 301 } 302 303 public <T> T execute(Command<T> command) { 304 return ACTOR.execute(handler, command); 305 } 306 307 public SegmentCacheIndexRegistry getIndexRegistry() { 308 return indexRegistry; 309 } 310 311 /** 312 * Adds a segment to segment index. 313 * 314 * <p>Called when a SQL statement has finished loading a segment.</p> 315 * 316 * <p>Does not add the segment to the external cache. That is a potentially 317 * long-duration operation, better carried out by a worker.</p> 318 * 319 * @param header segment header 320 * @param body segment body 321 */ 322 public void loadSucceeded( 323 RolapStar star, 324 SegmentHeader header, 325 SegmentBody body) 326 { 327 final Locus locus = Locus.peek(); 328 ACTOR.event( 329 handler, 330 new SegmentLoadSucceededEvent( 331 System.currentTimeMillis(), 332 locus.getServer().getMonitor(), 333 locus.getServer().getId(), 334 locus.execution.getMondrianStatement() 335 .getMondrianConnection().getId(), 336 locus.execution.getMondrianStatement().getId(), 337 locus.execution.getId(), 338 star, 339 header, 340 body)); 341 } 342 343 /** 344 * Informs cache manager that a segment load failed. 345 * 346 * <p>Called when a SQL statement receives an error while loading a 347 * segment.</p> 348 * 349 * @param header segment header 350 * @param throwable Error 351 */ 352 public void loadFailed( 353 RolapStar star, 354 SegmentHeader header, 355 Throwable throwable) 356 { 357 final Locus locus = Locus.peek(); 358 ACTOR.event( 359 handler, 360 new SegmentLoadFailedEvent( 361 System.currentTimeMillis(), 362 locus.getServer().getMonitor(), 363 locus.getServer().getId(), 364 locus.execution.getMondrianStatement() 365 .getMondrianConnection().getId(), 366 locus.execution.getMondrianStatement().getId(), 367 locus.execution.getId(), 368 star, 369 header, 370 throwable)); 371 } 372 373 /** 374 * Removes a segment from segment index. 375 * 376 * <p>Call is asynchronous. It comes back immediately.</p> 377 * 378 * <p>Does not remove it from the external cache.</p> 379 * 380 * @param header segment header 381 */ 382 public void remove( 383 RolapStar star, 384 SegmentHeader header) 385 { 386 final Locus locus = Locus.peek(); 387 ACTOR.event( 388 handler, 389 new SegmentRemoveEvent( 390 System.currentTimeMillis(), 391 locus.getServer().getMonitor(), 392 locus.getServer().getId(), 393 locus.execution.getMondrianStatement() 394 .getMondrianConnection().getId(), 395 locus.execution.getMondrianStatement().getId(), 396 locus.execution.getId(), 397 this, 398 star, 399 header)); 400 } 401 402 /** 403 * Tells the cache that a segment is newly available in an external cache. 404 */ 405 public void externalSegmentCreated( 406 SegmentHeader header, 407 MondrianServer server) 408 { 409 if (MondrianProperties.instance().DisableCaching.get()) { 410 // Ignore cache requests. 411 return; 412 } 413 ACTOR.event( 414 handler, 415 new ExternalSegmentCreatedEvent( 416 System.currentTimeMillis(), 417 server.getMonitor(), 418 server.getId(), 419 0, 420 0, 421 0, 422 this, 423 header)); 424 } 425 426 /** 427 * Tells the cache that a segment is no longer available in an external 428 * cache. 429 */ 430 public void externalSegmentDeleted( 431 SegmentHeader header, 432 MondrianServer server) 433 { 434 if (MondrianProperties.instance().DisableCaching.get()) { 435 // Ignore cache requests. 436 return; 437 } 438 ACTOR.event( 439 handler, 440 new ExternalSegmentDeletedEvent( 441 System.currentTimeMillis(), 442 server.getMonitor(), 443 server.getId(), 444 0, 445 0, 446 0, 447 this, 448 header)); 449 } 450 451 public void printCacheState( 452 CellRegion region, 453 PrintWriter pw, 454 Locus locus) 455 { 456 ACTOR.execute( 457 handler, 458 new PrintCacheStateCommand(region, pw, locus)); 459 } 460 461 /** 462 * Shuts down this cache manager and all active threads and indexes. 463 */ 464 public void shutdown() { 465 execute(new ShutdownCommand()); 466 cacheExecutor.shutdown(); 467 sqlExecutor.shutdown(); 468 } 469 470 public SegmentBuilder.SegmentConverter getConverter( 471 RolapStar star, 472 SegmentHeader header) 473 { 474 return indexRegistry.getIndex(star) 475 .getConverter( 476 header.schemaName, 477 header.schemaChecksum, 478 header.cubeName, 479 header.rolapStarFactTableName, 480 header.measureName, 481 header.compoundPredicates); 482 } 483 484 /** 485 * Makes a quick request to the aggregation manager to see whether the 486 * cell value required by a particular cell request is in external cache. 487 * 488 * <p>'Quick' is relative. It is an asynchronous request (due to 489 * the aggregation manager being an actor) and therefore somewhat slow. If 490 * the segment is in cache, will save batching up future requests and 491 * re-executing the query. Win should be particularly noticeable for queries 492 * running on a populated cache. Without this feature, every query would 493 * require at least two iterations.</p> 494 * 495 * <p>Request does not issue SQL to populate the segment. Nor does it 496 * try to find existing segments for rollup. Those operations can wait until 497 * next phase.</p> 498 * 499 * <p>Client is responsible for adding the segment to its private cache.</p> 500 * 501 * @param request Cell request 502 * @return Segment with data, or null if not in cache 503 */ 504 public SegmentWithData peek(final CellRequest request) { 505 final SegmentCacheManager.PeekResponse response = 506 execute( 507 new PeekCommand(request, Locus.peek())); 508 for (SegmentHeader header : response.headerMap.keySet()) { 509 final SegmentBody body = compositeCache.get(header); 510 if (body != null) { 511 final SegmentBuilder.SegmentConverter converter = 512 response.converterMap.get( 513 SegmentCacheIndexImpl.makeConverterKey(header)); 514 if (converter != null) { 515 return converter.convert(header, body); 516 } 517 } 518 } 519 for (Map.Entry<SegmentHeader, Future<SegmentBody>> entry 520 : response.headerMap.entrySet()) 521 { 522 final Future<SegmentBody> bodyFuture = entry.getValue(); 523 if (bodyFuture != null) { 524 final SegmentBody body = 525 Util.safeGet( 526 bodyFuture, 527 "Waiting for segment to load"); 528 final SegmentHeader header = entry.getKey(); 529 final SegmentBuilder.SegmentConverter converter = 530 response.converterMap.get( 531 SegmentCacheIndexImpl.makeConverterKey(header)); 532 if (converter != null) { 533 return converter.convert(header, body); 534 } 535 } 536 } 537 return null; 538 } 539 540 /** 541 * Visitor for messages (commands and events). 542 */ 543 public interface Visitor { 544 void visit(SegmentLoadSucceededEvent event); 545 void visit(SegmentLoadFailedEvent event); 546 void visit(SegmentRemoveEvent event); 547 void visit(ExternalSegmentCreatedEvent event); 548 void visit(ExternalSegmentDeletedEvent event); 549 } 550 551 private class Handler implements Visitor { 552 public void visit(SegmentLoadSucceededEvent event) { 553 indexRegistry.getIndex(event.star) 554 .loadSucceeded( 555 event.header, 556 event.body); 557 558 event.monitor.sendEvent( 559 new CellCacheSegmentCreateEvent( 560 event.timestamp, 561 event.serverId, 562 event.connectionId, 563 event.statementId, 564 event.executionId, 565 event.header.getConstrainedColumns().size(), 566 event.body == null 567 ? 0 568 : event.body.getValueMap().size(), 569 CellCacheSegmentCreateEvent.Source.SQL)); 570 } 571 572 public void visit(SegmentLoadFailedEvent event) { 573 indexRegistry.getIndex(event.star) 574 .loadFailed( 575 event.header, 576 event.throwable); 577 } 578 579 public void visit(final SegmentRemoveEvent event) { 580 indexRegistry.getIndex(event.star) 581 .remove(event.header); 582 583 event.monitor.sendEvent( 584 new CellCacheSegmentDeleteEvent( 585 event.timestamp, 586 event.serverId, 587 event.connectionId, 588 event.statementId, 589 event.executionId, 590 event.header.getConstrainedColumns().size(), 591 CellCacheEvent.Source.CACHE_CONTROL)); 592 593 // Remove the segment from external caches. Use an executor, because 594 // it may take some time. We discard the future, because we don't 595 // care too much if it fails. 596 final Future<?> future = event.cacheMgr.cacheExecutor.submit( 597 new Runnable() { 598 public void run() { 599 try { 600 // Note that the SegmentCache API doesn't require 601 // us to verify that the segment exists (by calling 602 // "contains") before we call "remove". 603 event.cacheMgr.compositeCache.remove(event.header); 604 } catch (Throwable e) { 605 LOGGER.warn( 606 "remove header failed: " + event.header, 607 e); 608 } 609 } 610 } 611 ); 612 Util.safeGet(future, "SegmentCacheManager.segmentremoved"); 613 } 614 615 public void visit(ExternalSegmentCreatedEvent event) { 616 final SegmentCacheIndex index = 617 event.cacheMgr.indexRegistry.getIndex(event.header); 618 if (index != null) { 619 index.add(event.header, false, null); 620 event.monitor.sendEvent( 621 new CellCacheSegmentCreateEvent( 622 event.timestamp, 623 event.serverId, 624 event.connectionId, 625 event.statementId, 626 event.executionId, 627 event.header.getConstrainedColumns().size(), 628 0, 629 CellCacheEvent.Source.EXTERNAL)); 630 } 631 } 632 633 public void visit(ExternalSegmentDeletedEvent event) { 634 final SegmentCacheIndex index = 635 event.cacheMgr.indexRegistry.getIndex(event.header); 636 if (index != null) { 637 index.remove(event.header); 638 event.monitor.sendEvent( 639 new CellCacheSegmentDeleteEvent( 640 event.timestamp, 641 event.serverId, 642 event.connectionId, 643 event.statementId, 644 event.executionId, 645 event.header.getConstrainedColumns().size(), 646 CellCacheEvent.Source.EXTERNAL)); 647 } 648 } 649 } 650 651 interface Message { 652 } 653 654 public static interface Command<T> extends Message, Callable<T> { 655 Locus getLocus(); 656 } 657 658 /** 659 * Command to flush a particular region from cache. 660 */ 661 public static final class FlushCommand implements Command<FlushResult> { 662 private final CellRegion region; 663 private final CacheControlImpl cacheControlImpl; 664 private final Locus locus; 665 private final SegmentCacheManager cacheMgr; 666 667 public FlushCommand( 668 Locus locus, 669 SegmentCacheManager mgr, 670 CellRegion region, 671 CacheControlImpl cacheControlImpl) 672 { 673 this.locus = locus; 674 this.cacheMgr = mgr; 675 this.region = region; 676 this.cacheControlImpl = cacheControlImpl; 677 } 678 679 public Locus getLocus() { 680 return locus; 681 } 682 683 public FlushResult call() throws Exception { 684 // For each measure and each star, ask the index 685 // which headers intersect. 686 final List<SegmentHeader> headers = 687 new ArrayList<SegmentHeader>(); 688 final List<Member> measures = 689 CacheControlImpl.findMeasures(region); 690 final SegmentColumn[] flushRegion = 691 CacheControlImpl.findAxisValues(region); 692 final List<RolapStar> starList = 693 CacheControlImpl.getStarList(region); 694 695 for (Member member : measures) { 696 if (!(member instanceof RolapStoredMeasure)) { 697 continue; 698 } 699 final RolapStoredMeasure storedMeasure = 700 (RolapStoredMeasure) member; 701 final RolapStar star = storedMeasure.getCube().getStar(); 702 final SegmentCacheIndex index = 703 cacheMgr.indexRegistry.getIndex(star); 704 headers.addAll( 705 index.intersectRegion( 706 member.getDimension().getSchema().getName(), 707 ((RolapSchema) member.getDimension().getSchema()) 708 .getChecksum(), 709 storedMeasure.getCube().getName(), 710 storedMeasure.getName(), 711 storedMeasure.getCube().getStar() 712 .getFactTable().getAlias(), 713 flushRegion)); 714 if (cacheControlImpl.isTraceEnabled()) { 715 Collections.sort( 716 headers, 717 new Comparator<SegmentHeader>() { 718 public int compare( 719 SegmentHeader o1, 720 SegmentHeader o2) 721 { 722 return o1.getUniqueID() 723 .compareTo(o2.getUniqueID()); 724 } 725 }); 726 } 727 } 728 729 // If flushRegion is empty, this means we must clear all 730 // segments for the region's measures. 731 if (flushRegion.length == 0) { 732 for (final SegmentHeader header : headers) { 733 for (RolapStar star : starList) { 734 cacheMgr.indexRegistry.getIndex(star).remove(header); 735 } 736 // Remove the segment from external caches. Use an 737 // executor, because it may take some time. We discard 738 // the future, because we don't care too much if it fails. 739 cacheControlImpl.trace( 740 "discard segment - it cannot be constrained and maintain consistency:\n" 741 + header.getDescription()); 742 743 final Future<?> task = cacheMgr.cacheExecutor.submit( 744 new Runnable() { 745 public void run() { 746 try { 747 // Note that the SegmentCache API doesn't 748 // require us to verify that the segment 749 // exists (by calling "contains") before we 750 // call "remove". 751 cacheMgr.compositeCache.remove(header); 752 } catch (Throwable e) { 753 LOGGER.warn( 754 "remove header failed: " + header, 755 e); 756 } 757 } 758 }); 759 Util.safeGet(task, "SegmentCacheManager.flush"); 760 } 761 return new FlushResult( 762 Collections.<Callable<Boolean>>emptyList()); 763 } 764 765 // Now we know which headers intersect. For each of them, 766 // we append an excluded region. 767 // 768 // TODO: Optimize the logic here. If a segment is mostly 769 // empty, we should trash it completely. 770 final List<Callable<Boolean>> callableList = 771 new ArrayList<Callable<Boolean>>(); 772 for (final SegmentHeader header : headers) { 773 if (!header.canConstrain(flushRegion)) { 774 // We have to delete that segment altogether. 775 cacheControlImpl.trace( 776 "discard segment - it cannot be constrained and maintain consistency:\n" 777 + header.getDescription()); 778 for (RolapStar star : starList) { 779 cacheMgr.indexRegistry.getIndex(star).remove(header); 780 } 781 continue; 782 } 783 final SegmentHeader newHeader = 784 header.constrain(flushRegion); 785 for (final SegmentCacheWorker worker 786 : cacheMgr.segmentCacheWorkers) 787 { 788 callableList.add( 789 new Callable<Boolean>() { 790 public Boolean call() throws Exception { 791 boolean existed; 792 if (worker.supportsRichIndex()) { 793 final SegmentBody sb = worker.get(header); 794 existed = worker.remove(header); 795 if (sb != null) { 796 worker.put(newHeader, sb); 797 } 798 } else { 799 // The cache doesn't support rich index. We 800 // have to clear the segment entirely. 801 existed = worker.remove(header); 802 } 803 return existed; 804 } 805 }); 806 } 807 for (RolapStar star : starList) { 808 SegmentCacheIndex index = 809 cacheMgr.indexRegistry.getIndex(star); 810 index.remove(header); 811 index.add(newHeader, false, null); 812 } 813 } 814 815 // Done 816 return new FlushResult(callableList); 817 } 818 } 819 820 private class PrintCacheStateCommand 821 implements SegmentCacheManager.Command<Void> 822 { 823 private final PrintWriter pw; 824 private final Locus locus; 825 private final CellRegion region; 826 827 public PrintCacheStateCommand( 828 CellRegion region, 829 PrintWriter pw, 830 Locus locus) 831 { 832 this.region = region; 833 this.pw = pw; 834 this.locus = locus; 835 } 836 837 public Void call() { 838 final List<RolapStar> starList = 839 CacheControlImpl.getStarList(region); 840 Collections.sort( 841 starList, 842 new Comparator<RolapStar>() { 843 public int compare(RolapStar o1, RolapStar o2) { 844 return o1.getFactTable().getAlias().compareTo( 845 o2.getFactTable().getAlias()); 846 } 847 }); 848 for (RolapStar star : starList) { 849 indexRegistry.getIndex(star) 850 .printCacheState(pw); 851 } 852 return null; 853 } 854 855 public Locus getLocus() { 856 return locus; 857 } 858 } 859 860 /** 861 * Result of a {@link FlushCommand}. Contains a list of tasks that must 862 * be executed by the caller (or by an executor) to flush segments from the 863 * external cache(s). 864 */ 865 public static class FlushResult { 866 public final List<Callable<Boolean>> tasks; 867 868 public FlushResult(List<Callable<Boolean>> tasks) { 869 this.tasks = tasks; 870 } 871 } 872 873 /** 874 * Special exception, thrown only by {@link ShutdownCommand}, telling 875 * the actor to shut down. 876 */ 877 private static class PleaseShutdownException extends RuntimeException { 878 private PleaseShutdownException() { 879 } 880 } 881 882 private static class ShutdownCommand implements Command<String> { 883 public ShutdownCommand() { 884 } 885 886 public String call() throws Exception { 887 throw new PleaseShutdownException(); 888 } 889 890 public Locus getLocus() { 891 return null; 892 } 893 } 894 895 private static abstract class Event implements Message { 896 /** 897 * Dispatches a call to the appropriate {@code visit} method on 898 * {@link mondrian.server.monitor.Visitor}. 899 * 900 * @param visitor Visitor 901 */ 902 public abstract void acceptWithoutResponse(Visitor visitor); 903 } 904 905 /** 906 * Copy-pasted from {@link mondrian.server.monitor.Monitor}. Consider 907 * abstracting common code. 908 */ 909 private static class Actor implements Runnable { 910 911 private final BlockingQueue<Pair<Handler, Message>> eventQueue = 912 new ArrayBlockingQueue<Pair<Handler, Message>>(1000); 913 914 private final BlockingHashMap<Command<?>, Pair<Object, Throwable>> 915 responseMap = 916 new BlockingHashMap<Command<?>, Pair<Object, Throwable>>(1000); 917 918 public void run() { 919 try { 920 for (;;) { 921 final Pair<Handler, Message> entry = eventQueue.take(); 922 final Handler handler = entry.left; 923 final Message message = entry.right; 924 try { 925 // A message is either a command or an event. 926 // A command returns a value that must be read by 927 // the caller. 928 if (message instanceof Command<?>) { 929 Command<?> command = (Command<?>) message; 930 try { 931 Locus.push(command.getLocus()); 932 Object result = command.call(); 933 responseMap.put( 934 command, 935 Pair.of(result, (Throwable) null)); 936 } catch (AbortException e) { 937 responseMap.put( 938 command, 939 Pair.of(null, (Throwable) e)); 940 } catch (PleaseShutdownException e) { 941 responseMap.put( 942 command, 943 Pair.of(null, (Throwable) null)); 944 return; // exit event loop 945 } catch (Throwable e) { 946 responseMap.put( 947 command, 948 Pair.of(null, e)); 949 } finally { 950 Locus.pop(command.getLocus()); 951 } 952 } else { 953 Event event = (Event) message; 954 event.acceptWithoutResponse(handler); 955 956 // Broadcast the event to anyone who is interested. 957 RolapUtil.MONITOR_LOGGER.debug(message); 958 } 959 } catch (Throwable e) { 960 // REVIEW: Somewhere better to send it? 961 e.printStackTrace(); 962 } 963 } 964 } catch (InterruptedException e) { 965 // REVIEW: Somewhere better to send it? 966 e.printStackTrace(); 967 } catch (Throwable e) { 968 e.printStackTrace(); 969 } 970 } 971 972 <T> T execute(Handler handler, Command<T> command) { 973 try { 974 eventQueue.put(Pair.<Handler, Message>of(handler, command)); 975 } catch (InterruptedException e) { 976 throw Util.newError(e, "Exception while executing " + command); 977 } 978 try { 979 final Pair<Object, Throwable> pair = 980 responseMap.get(command); 981 if (pair.right != null) { 982 if (pair.right instanceof RuntimeException) { 983 throw (RuntimeException) pair.right; 984 } else if (pair.right instanceof Error) { 985 throw (Error) pair.right; 986 } else { 987 throw new RuntimeException(pair.right); 988 } 989 } else { 990 return (T) pair.left; 991 } 992 } catch (InterruptedException e) { 993 throw Util.newError(e, "Exception while executing " + command); 994 } 995 } 996 997 public void event(Handler handler, Event event) { 998 try { 999 eventQueue.put(Pair.<Handler, Message>of(handler, event)); 1000 } catch (InterruptedException e) { 1001 throw Util.newError(e, "Exception while executing " + event); 1002 } 1003 } 1004 } 1005 1006 private static class SegmentLoadSucceededEvent extends Event { 1007 private final SegmentHeader header; 1008 private final SegmentBody body; 1009 private final long timestamp; 1010 private final RolapStar star; 1011 private final int serverId; 1012 private final int connectionId; 1013 private final long statementId; 1014 private final long executionId; 1015 private final Monitor monitor; 1016 1017 public SegmentLoadSucceededEvent( 1018 long timestamp, 1019 Monitor monitor, 1020 int serverId, 1021 int connectionId, 1022 long statementId, 1023 long executionId, 1024 RolapStar star, 1025 SegmentHeader header, 1026 SegmentBody body) 1027 { 1028 this.timestamp = timestamp; 1029 this.monitor = monitor; 1030 this.serverId = serverId; 1031 this.connectionId = connectionId; 1032 this.statementId = statementId; 1033 this.executionId = executionId; 1034 assert header != null; 1035 assert star != null; 1036 this.star = star; 1037 this.header = header; 1038 this.body = body; // may be null 1039 } 1040 1041 public void acceptWithoutResponse(Visitor visitor) { 1042 visitor.visit(this); 1043 } 1044 } 1045 1046 private static class SegmentLoadFailedEvent extends Event { 1047 private final SegmentHeader header; 1048 private final Throwable throwable; 1049 private final long timestamp; 1050 private final RolapStar star; 1051 private final Monitor monitor; 1052 private final int serverId; 1053 private final int connectionId; 1054 private final long statementId; 1055 private final long executionId; 1056 1057 public SegmentLoadFailedEvent( 1058 long timestamp, 1059 Monitor monitor, 1060 int serverId, 1061 int connectionId, 1062 long statementId, 1063 long executionId, 1064 RolapStar star, 1065 SegmentHeader header, 1066 Throwable throwable) 1067 { 1068 this.timestamp = timestamp; 1069 this.monitor = monitor; 1070 this.serverId = serverId; 1071 this.connectionId = connectionId; 1072 this.statementId = statementId; 1073 this.executionId = executionId; 1074 this.star = star; 1075 this.throwable = throwable; 1076 assert header != null; 1077 this.header = header; 1078 } 1079 1080 public void acceptWithoutResponse(Visitor visitor) { 1081 visitor.visit(this); 1082 } 1083 } 1084 1085 private static class SegmentRemoveEvent extends Event { 1086 private final SegmentHeader header; 1087 private final long timestamp; 1088 private final Monitor monitor; 1089 private final int serverId; 1090 private final int connectionId; 1091 private final long statementId; 1092 private final long executionId; 1093 private final RolapStar star; 1094 private final SegmentCacheManager cacheMgr; 1095 1096 public SegmentRemoveEvent( 1097 long timestamp, 1098 Monitor monitor, 1099 int serverId, 1100 int connectionId, 1101 long statementId, 1102 long executionId, 1103 SegmentCacheManager cacheMgr, 1104 RolapStar star, 1105 SegmentHeader header) 1106 { 1107 this.timestamp = timestamp; 1108 this.monitor = monitor; 1109 this.serverId = serverId; 1110 this.connectionId = connectionId; 1111 this.statementId = statementId; 1112 this.executionId = executionId; 1113 this.cacheMgr = cacheMgr; 1114 this.star = star; 1115 assert header != null; 1116 this.header = header; 1117 } 1118 1119 public void acceptWithoutResponse(Visitor visitor) { 1120 visitor.visit(this); 1121 } 1122 } 1123 1124 private static class ExternalSegmentCreatedEvent extends Event { 1125 private final SegmentCacheManager cacheMgr; 1126 private final SegmentHeader header; 1127 private final long timestamp; 1128 private final Monitor monitor; 1129 private final int serverId; 1130 private final int connectionId; 1131 private final long statementId; 1132 private final long executionId; 1133 1134 public ExternalSegmentCreatedEvent( 1135 long timestamp, 1136 Monitor monitor, 1137 int serverId, 1138 int connectionId, 1139 long statementId, 1140 long executionId, 1141 SegmentCacheManager cacheMgr, 1142 SegmentHeader header) 1143 { 1144 this.timestamp = timestamp; 1145 this.monitor = monitor; 1146 this.serverId = serverId; 1147 this.connectionId = connectionId; 1148 this.statementId = statementId; 1149 this.executionId = executionId; 1150 assert header != null; 1151 assert cacheMgr != null; 1152 this.cacheMgr = cacheMgr; 1153 this.header = header; 1154 } 1155 1156 public void acceptWithoutResponse(Visitor visitor) { 1157 visitor.visit(this); 1158 } 1159 } 1160 1161 private static class ExternalSegmentDeletedEvent extends Event { 1162 private final SegmentCacheManager cacheMgr; 1163 private final SegmentHeader header; 1164 private final long timestamp; 1165 private final Monitor monitor; 1166 private final int serverId; 1167 private final int connectionId; 1168 private final long statementId; 1169 private final long executionId; 1170 1171 public ExternalSegmentDeletedEvent( 1172 long timestamp, 1173 Monitor monitor, 1174 int serverId, 1175 int connectionId, 1176 long statementId, 1177 long executionId, 1178 SegmentCacheManager cacheMgr, 1179 SegmentHeader header) 1180 { 1181 this.timestamp = timestamp; 1182 this.monitor = monitor; 1183 this.serverId = serverId; 1184 this.connectionId = connectionId; 1185 this.statementId = statementId; 1186 this.executionId = executionId; 1187 assert header != null; 1188 assert cacheMgr != null; 1189 this.cacheMgr = cacheMgr; 1190 this.header = header; 1191 } 1192 1193 public void acceptWithoutResponse(Visitor visitor) { 1194 visitor.visit(this); 1195 } 1196 } 1197 1198 /** 1199 * Implementation of SegmentCacheListener that updates the 1200 * segment index of its aggregation manager instance when it receives 1201 * events from its assigned SegmentCache implementation. 1202 */ 1203 private static class AsyncCacheListener 1204 implements SegmentCache.SegmentCacheListener 1205 { 1206 private final SegmentCacheManager cacheMgr; 1207 private final MondrianServer server; 1208 1209 public AsyncCacheListener( 1210 SegmentCacheManager cacheMgr, 1211 MondrianServer server) 1212 { 1213 this.cacheMgr = cacheMgr; 1214 this.server = server; 1215 } 1216 1217 public void handle(final SegmentCacheEvent e) { 1218 if (e.isLocal()) { 1219 return; 1220 } 1221 Locus.execute( 1222 Execution.NONE, 1223 "AsyncCacheListener.handle", 1224 new Locus.Action<Void>() { 1225 public Void execute() { 1226 final SegmentCacheManager.Command<Void> command; 1227 final Locus locus = Locus.peek(); 1228 switch (e.getEventType()) { 1229 case ENTRY_CREATED: 1230 command = 1231 new Command<Void>() { 1232 public Void call() { 1233 cacheMgr.externalSegmentCreated( 1234 e.getSource(), 1235 server); 1236 return null; 1237 } 1238 public Locus getLocus() { 1239 return locus; 1240 } 1241 }; 1242 break; 1243 case ENTRY_DELETED: 1244 command = 1245 new Command<Void>() { 1246 public Void call() { 1247 cacheMgr.externalSegmentDeleted( 1248 e.getSource(), 1249 server); 1250 return null; 1251 } 1252 public Locus getLocus() { 1253 return locus; 1254 } 1255 }; 1256 break; 1257 default: 1258 throw new UnsupportedOperationException(); 1259 } 1260 cacheMgr.execute(command); 1261 return null; 1262 } 1263 }); 1264 } 1265 } 1266 1267 /** 1268 * Makes a collection of {@link SegmentCacheWorker} objects (each of which 1269 * is backed by a {@link SegmentCache} appear to be a SegmentCache. 1270 * 1271 * <p>For most operations, it is easier to operate on a single cache. 1272 * It is usually clear whether operations should quit when finding the first 1273 * match, or to operate on all workers. (For example, {@link #remove} tries 1274 * to remove the segment header from all workers, and returns whether it 1275 * was removed from any of them.) This class just does what seems 1276 * most typical. If you want another behavior for a particular operation, 1277 * operate on the workers directly.</p> 1278 */ 1279 static class CompositeSegmentCache implements SegmentCache { 1280 final List<SegmentCacheWorker> workers; 1281 1282 public CompositeSegmentCache(List<SegmentCacheWorker> workers) { 1283 this.workers = workers; 1284 } 1285 1286 public SegmentBody get(SegmentHeader header) { 1287 for (SegmentCacheWorker worker : workers) { 1288 final SegmentBody body = worker.get(header); 1289 if (body != null) { 1290 return body; 1291 } 1292 } 1293 return null; 1294 } 1295 1296 public List<SegmentHeader> getSegmentHeaders() { 1297 if (MondrianProperties.instance().DisableCaching.get()) { 1298 return Collections.emptyList(); 1299 } 1300 // Special case 0 and 1 workers, for which the 'union' operation 1301 // is trivial. 1302 switch (workers.size()) { 1303 case 0: 1304 return Collections.emptyList(); 1305 case 1: 1306 return workers.get(0).getSegmentHeaders(); 1307 default: 1308 final List<SegmentHeader> list = new ArrayList<SegmentHeader>(); 1309 final Set<SegmentHeader> set = new HashSet<SegmentHeader>(); 1310 for (SegmentCacheWorker worker : workers) { 1311 for (SegmentHeader header : worker.getSegmentHeaders()) { 1312 if (set.add(header)) { 1313 list.add(header); 1314 } 1315 } 1316 } 1317 return list; 1318 } 1319 } 1320 1321 public boolean put(SegmentHeader header, SegmentBody body) { 1322 if (MondrianProperties.instance().DisableCaching.get()) { 1323 return true; 1324 } 1325 for (SegmentCacheWorker worker : workers) { 1326 worker.put(header, body); 1327 } 1328 return true; 1329 } 1330 1331 public boolean remove(SegmentHeader header) { 1332 boolean result = false; 1333 for (SegmentCacheWorker worker : workers) { 1334 if (worker.remove(header)) { 1335 result = true; 1336 } 1337 } 1338 return result; 1339 } 1340 1341 public void tearDown() { 1342 for (SegmentCacheWorker worker : workers) { 1343 worker.shutdown(); 1344 } 1345 } 1346 1347 public void addListener(SegmentCacheListener listener) { 1348 for (SegmentCacheWorker worker : workers) { 1349 worker.cache.addListener(listener); 1350 } 1351 } 1352 1353 public void removeListener(SegmentCacheListener listener) { 1354 for (SegmentCacheWorker worker : workers) { 1355 worker.cache.removeListener(listener); 1356 } 1357 } 1358 1359 public boolean supportsRichIndex() { 1360 for (SegmentCacheWorker worker : workers) { 1361 if (!worker.supportsRichIndex()) { 1362 return false; 1363 } 1364 } 1365 return true; 1366 } 1367 } 1368 1369 /** 1370 * Locates segments in the cache that satisfy a given request. 1371 * 1372 * <p>The result consists of (a) a list of segment headers, (b) a list 1373 * of futures for segment bodies that are currently being loaded, (c) 1374 * converters to convert headers into {@link SegmentWithData}.</p> 1375 * 1376 * <p>For (a), the client should call the cache to get the body for each 1377 * segment header; it is possible that there is no body in the cache. 1378 * For (b), the client will have to wait for the segment to arrive.</p> 1379 */ 1380 private class PeekCommand 1381 implements SegmentCacheManager.Command<PeekResponse> 1382 { 1383 private final CellRequest request; 1384 private final Locus locus; 1385 1386 /** 1387 * Creates a PeekCommand. 1388 * 1389 * @param request Cell request 1390 * @param locus Locus 1391 */ 1392 public PeekCommand( 1393 CellRequest request, 1394 Locus locus) 1395 { 1396 this.request = request; 1397 this.locus = locus; 1398 } 1399 1400 public PeekResponse call() { 1401 final RolapStar.Measure measure = request.getMeasure(); 1402 final RolapStar star = measure.getStar(); 1403 final RolapSchema schema = star.getSchema(); 1404 final AggregationKey key = new AggregationKey(request); 1405 final List<SegmentHeader> headers = 1406 indexRegistry.getIndex(star) 1407 .locate( 1408 schema.getName(), 1409 schema.getChecksum(), 1410 measure.getCubeName(), 1411 measure.getName(), 1412 star.getFactTable().getAlias(), 1413 request.getConstrainedColumnsBitKey(), 1414 request.getMappedCellValues(), 1415 AggregationKey.getCompoundPredicateStringList( 1416 star, 1417 key.getCompoundPredicateList())); 1418 1419 final Map<SegmentHeader, Future<SegmentBody>> headerMap = 1420 new HashMap<SegmentHeader, Future<SegmentBody>>(); 1421 final Map<List, SegmentBuilder.SegmentConverter> converterMap = 1422 new HashMap<List, SegmentBuilder.SegmentConverter>(); 1423 1424 // Is there a pending segment? (A segment that has been created and 1425 // is loading via SQL.) 1426 for (final SegmentHeader header : headers) { 1427 final Future<SegmentBody> bodyFuture = 1428 indexRegistry.getIndex(star) 1429 .getFuture(locus.execution, header); 1430 if (bodyFuture != null) { 1431 // Check if the DataSourceChangeListener wants us to clear 1432 // the current segment 1433 if (star.getChangeListener() != null 1434 && star.getChangeListener().isAggregationChanged(key)) 1435 { 1436 // We can't satisfy this request, and we must clear the 1437 // data from our cache. This must be in sync with the 1438 // actor thread to maintain consistency. 1439 indexRegistry.getIndex(star).remove(header); 1440 Util.safeGet( 1441 cacheExecutor.submit( 1442 new Runnable() { 1443 public void run() { 1444 try { 1445 compositeCache.remove(header); 1446 } catch (Throwable e) { 1447 LOGGER.warn( 1448 "remove header failed: " 1449 + header, 1450 e); 1451 } 1452 } 1453 }), 1454 "SegmentCacheManager.peek"); 1455 continue; 1456 } 1457 converterMap.put( 1458 SegmentCacheIndexImpl.makeConverterKey(header), 1459 getConverter(star, header)); 1460 headerMap.put( 1461 header, bodyFuture); 1462 } 1463 } 1464 1465 return new PeekResponse(headerMap, converterMap); 1466 } 1467 1468 public Locus getLocus() { 1469 return locus; 1470 } 1471 } 1472 1473 private static class PeekResponse { 1474 public final Map<SegmentHeader, Future<SegmentBody>> headerMap; 1475 public final Map<List, SegmentBuilder.SegmentConverter> converterMap; 1476 1477 public PeekResponse( 1478 Map<SegmentHeader, Future<SegmentBody>> headerMap, 1479 Map<List, SegmentBuilder.SegmentConverter> converterMap) 1480 { 1481 this.headerMap = headerMap; 1482 this.converterMap = converterMap; 1483 } 1484 } 1485 1486 /** 1487 * Registry of all the indexes that were created for this 1488 * cache manager, per {@link RolapStar}. 1489 */ 1490 public class SegmentCacheIndexRegistry { 1491 private final Map<RolapStar, SegmentCacheIndex> indexes = 1492 new WeakHashMap<RolapStar, SegmentCacheIndex>(); 1493 /** 1494 * Returns the {@link SegmentCacheIndex} for a given 1495 * {@link RolapStar}. 1496 */ 1497 public SegmentCacheIndex getIndex(RolapStar star) { 1498 if (!indexes.containsKey(star)) { 1499 indexes.put(star, new SegmentCacheIndexImpl(thread)); 1500 } 1501 return indexes.get(star); 1502 } 1503 /** 1504 * Returns the {@link SegmentCacheIndex} for a given 1505 * {@link SegmentHeader}. 1506 */ 1507 private SegmentCacheIndex getIndex( 1508 SegmentHeader header) 1509 { 1510 // First we check the indexes that already exist. 1511 // This is fast. 1512 for (Entry<RolapStar, SegmentCacheIndex> entry 1513 : indexes.entrySet()) 1514 { 1515 final String factTableName = 1516 entry.getKey().getFactTable().getTableName(); 1517 final ByteString schemaChecksum = 1518 entry.getKey().getSchema().getChecksum(); 1519 if (!factTableName.equals(header.rolapStarFactTableName)) { 1520 continue; 1521 } 1522 if (!schemaChecksum.equals(header.schemaChecksum)) { 1523 continue; 1524 } 1525 return entry.getValue(); 1526 } 1527 // The index doesn't exist. Let's create it. 1528 for (RolapSchema schema : RolapSchema.getRolapSchemas()) { 1529 if (!schema.getChecksum().equals(header.schemaChecksum)) { 1530 continue; 1531 } 1532 // We have a schema match. 1533 RolapStar star = 1534 schema.getStar(header.rolapStarFactTableName); 1535 return getIndex(star); 1536 } 1537 return null; 1538 } 1539 public void cancelExecutionSegments(Execution exec) { 1540 for (SegmentCacheIndex index : indexes.values()) { 1541 index.cancel(exec); 1542 } 1543 } 1544 } 1545 1546 /** 1547 * Exception which someone can throw to indicate to the Actor that 1548 * whatever it was doing is not needed anymore. Won't trigger any output 1549 * to the logs. 1550 * 1551 * <p>If your {@link Command} throws this, it will be sent back at you. 1552 * You must handle it. 1553 */ 1554 public static final class AbortException extends RuntimeException { 1555 private static final long serialVersionUID = 1L; 1556 }; 1557} 1558 1559// End SegmentCacheManager.java