001/* 002// This software is subject to the terms of the Eclipse Public License v1.0 003// Agreement, available at the following URL: 004// http://www.eclipse.org/legal/epl-v10.html. 005// You must accept the terms of that agreement to use this software. 006// 007// Copyright (C) 2004-2005 Julian Hyde 008// Copyright (C) 2005-2013 Pentaho and others 009// All Rights Reserved. 010*/ 011package mondrian.rolap; 012 013import mondrian.olap.*; 014import mondrian.rolap.agg.*; 015import mondrian.rolap.aggmatcher.AggGen; 016import mondrian.rolap.aggmatcher.AggStar; 017import mondrian.rolap.cache.SegmentCacheIndex; 018import mondrian.rolap.cache.SegmentCacheIndexImpl; 019import mondrian.server.Execution; 020import mondrian.server.Locus; 021import mondrian.spi.*; 022import mondrian.util.*; 023 024import org.apache.log4j.Logger; 025import org.apache.log4j.MDC; 026 027import java.util.*; 028import java.util.concurrent.Future; 029 030/** 031 * A <code>FastBatchingCellReader</code> doesn't really Read cells: when asked 032 * to look up the values of stored measures, it lies, and records the fact 033 * that the value was asked for. Later, we can look over the values which 034 * are required, fetch them in an efficient way, and re-run the evaluation 035 * with a real evaluator. 036 * 037 * <p>NOTE: When it doesn't know the answer, it lies by returning an error 038 * object. The calling code must be able to deal with that.</p> 039 * 040 * <p>This class tries to minimize the amount of storage needed to record the 041 * fact that a cell was requested.</p> 042 */ 043public class FastBatchingCellReader implements CellReader { 044 private static final Logger LOGGER = 045 Logger.getLogger(FastBatchingCellReader.class); 046 047 private final int cellRequestLimit; 048 049 private final RolapCube cube; 050 051 /** 052 * Records the number of requests. The field is used for correctness: if 053 * the request count stays the same during an operation, you know that the 054 * FastBatchingCellReader has not told any lies during that operation, and 055 * therefore the result is true. The field is also useful for debugging. 056 */ 057 private int missCount; 058 059 /** 060 * Number of occasions that a requested cell was already in cache. 061 */ 062 private int hitCount; 063 064 /** 065 * Number of occasions that requested cell was in the process of being 066 * loaded into cache but not ready. 067 */ 068 private int pendingCount; 069 070 private final AggregationManager aggMgr; 071 072 private final boolean cacheEnabled; 073 074 private final SegmentCacheManager cacheMgr; 075 076 private final RolapAggregationManager.PinSet pinnedSegments; 077 078 /** 079 * Indicates that the reader has given incorrect results. 080 */ 081 private boolean dirty; 082 083 private final List<CellRequest> cellRequests = new ArrayList<CellRequest>(); 084 085 private final Execution execution; 086 087 /** 088 * Creates a FastBatchingCellReader. 089 * 090 * @param execution Execution that calling statement belongs to. Allows us 091 * to check for cancel 092 * @param cube Cube that requests belong to 093 * @param aggMgr Aggregation manager 094 */ 095 public FastBatchingCellReader( 096 Execution execution, 097 RolapCube cube, 098 AggregationManager aggMgr) 099 { 100 this.execution = execution; 101 assert cube != null; 102 assert execution != null; 103 this.cube = cube; 104 this.aggMgr = aggMgr; 105 cacheMgr = aggMgr.cacheMgr; 106 pinnedSegments = this.aggMgr.createPinSet(); 107 cacheEnabled = !MondrianProperties.instance().DisableCaching.get(); 108 109 cellRequestLimit = 110 MondrianProperties.instance().CellBatchSize.get() <= 0 111 ? 100000 // TODO Make this logic into a pluggable algorithm. 112 : MondrianProperties.instance().CellBatchSize.get(); 113 } 114 115 public Object get(RolapEvaluator evaluator) { 116 final CellRequest request = 117 RolapAggregationManager.makeRequest(evaluator); 118 119 if (request == null || request.isUnsatisfiable()) { 120 return Util.nullValue; // request not satisfiable. 121 } 122 123 // Try to retrieve a cell and simultaneously pin the segment which 124 // contains it. 125 final Object o = aggMgr.getCellFromCache(request, pinnedSegments); 126 127 assert o != Boolean.TRUE : "getCellFromCache no longer returns TRUE"; 128 if (o != null) { 129 ++hitCount; 130 return o; 131 } 132 133 // If this query has not had any cache misses, it's worth doing a 134 // synchronous request for the cell segment. If it is in the cache, it 135 // will be worth the wait, because we can avoid the effort of batching 136 // up requests that could have been satisfied by the same segment. 137 if (cacheEnabled 138 && missCount == 0) 139 { 140 SegmentWithData segmentWithData = cacheMgr.peek(request); 141 if (segmentWithData != null) { 142 segmentWithData.getStar().register(segmentWithData); 143 final Object o2 = 144 aggMgr.getCellFromCache(request, pinnedSegments); 145 if (o2 != null) { 146 ++hitCount; 147 return o2; 148 } 149 } 150 } 151 152 // if there is no such cell, record that we need to fetch it, and 153 // return 'error' 154 recordCellRequest(request); 155 return RolapUtil.valueNotReadyException; 156 } 157 158 public int getMissCount() { 159 return missCount; 160 } 161 162 public int getHitCount() { 163 return hitCount; 164 } 165 166 public int getPendingCount() { 167 return pendingCount; 168 } 169 170 public final void recordCellRequest(CellRequest request) { 171 assert !request.isUnsatisfiable(); 172 ++missCount; 173 cellRequests.add(request); 174 if (cellRequests.size() % cellRequestLimit == 0) { 175 // Signal that it's time to ask the cache manager if it has cells 176 // we need in the cache. Not really an exception. 177 throw CellRequestQuantumExceededException.INSTANCE; 178 } 179 } 180 181 /** 182 * Returns whether this reader has told a lie. This is the case if there 183 * are pending batches to load or if {@link #setDirty(boolean)} has been 184 * called. 185 */ 186 public boolean isDirty() { 187 return dirty || !cellRequests.isEmpty(); 188 } 189 190 /** 191 * Resolves any pending cell reads using the cache. After calling this 192 * method, all cells requested in a given batch are loaded into this 193 * statement's local cache. 194 * 195 * <p>The method is implemented by making an asynchronous call to the cache 196 * manager. The result is a list of segments that satisfies every cell 197 * request.</p> 198 * 199 * <p>The client should put the resulting segments into its "query local" 200 * cache, to ensure that future cells in that segment can be answered 201 * without a call to the cache manager. (That is probably 1000x faster.)</p> 202 * 203 * <p>The cache manager does not inform where client where each segment 204 * came from. There are several possibilities:</p> 205 * 206 * <ul> 207 * <li>Segment was already in cache (header and body)</li> 208 * <li>Segment is in the process of being loaded by executing a SQL 209 * statement (probably due to a request from another client)</li> 210 * <li>Segment is in an external cache (that is, header is in the cache, 211 * body is not yet)</li> 212 * <li>Segment can be created by rolling up one or more cache segments. 213 * (And of course each of these segments might be "paged out".)</li> 214 * <li>By executing a SQL {@code GROUP BY} statement</li> 215 * </ul> 216 * 217 * <p>Furthermore, segments in external cache may take some time to retrieve 218 * (a LAN round trip, say 1 millisecond, is a reasonable guess); and the 219 * request may fail. (It depends on the cache, but caches are at liberty 220 * to 'forget' segments.) So, any strategy that relies on cache segments 221 * should be able to fall back. Even if there are fall backs, only one call 222 * needs to be made to the cache manager.</p> 223 * 224 * @return Whether any aggregations were loaded. 225 */ 226 boolean loadAggregations() { 227 if (!isDirty()) { 228 return false; 229 } 230 231 // List of futures yielding segments populated by SQL statements. If 232 // loading requires several iterations, we just append to the list. We 233 // don't mind if it takes a while for SQL statements to return. 234 final List<Future<Map<Segment, SegmentWithData>>> sqlSegmentMapFutures = 235 new ArrayList<Future<Map<Segment, SegmentWithData>>>(); 236 237 final List<CellRequest> cellRequests1 = 238 new ArrayList<CellRequest>(cellRequests); 239 240 for (int iteration = 0;; ++iteration) { 241 final BatchLoader.LoadBatchResponse response = 242 cacheMgr.execute( 243 new BatchLoader.LoadBatchCommand( 244 Locus.peek(), 245 cacheMgr, 246 getDialect(), 247 cube, 248 Collections.unmodifiableList(cellRequests1))); 249 250 int failureCount = 0; 251 252 // Segments that have been retrieved from cache this cycle. Allows 253 // us to reduce calls to the external cache. 254 Map<SegmentHeader, SegmentBody> headerBodies = 255 new HashMap<SegmentHeader, SegmentBody>(); 256 257 // Load each suggested segment from cache, and place it in 258 // thread-local cache. Note that this step can't be done by the 259 // cacheMgr -- it's our cache. 260 for (SegmentHeader header : response.cacheSegments) { 261 final SegmentBody body = cacheMgr.compositeCache.get(header); 262 if (body == null) { 263 // REVIEW: This is an async call. It will return before the 264 // index is informed that this header is there, 265 // so a LoadBatchCommand might still return 266 // it on the next iteration. 267 if (cube.getStar() != null) { 268 cacheMgr.remove(cube.getStar(), header); 269 } 270 ++failureCount; 271 continue; 272 } 273 headerBodies.put(header, body); 274 final SegmentWithData segmentWithData = 275 response.convert(header, body); 276 segmentWithData.getStar().register(segmentWithData); 277 } 278 279 // Perform each suggested rollup. 280 // 281 // TODO this could be improved. 282 // See http://jira.pentaho.com/browse/MONDRIAN-1195 283 284 // Rollups that succeeded. Will tell cache mgr to put the headers 285 // into the index and the header/bodies in cache. 286 final Map<SegmentHeader, SegmentBody> succeededRollups = 287 new HashMap<SegmentHeader, SegmentBody>(); 288 289 for (final BatchLoader.RollupInfo rollup : response.rollups) { 290 // Gather the required segments. 291 Map<SegmentHeader, SegmentBody> map = 292 findResidentRollupCandidate(headerBodies, rollup); 293 if (map == null) { 294 // None of the candidate segment-sets for this rollup was 295 // all present in the cache. 296 continue; 297 } 298 299 final Set<String> keepColumns = new HashSet<String>(); 300 for (RolapStar.Column column : rollup.constrainedColumns) { 301 keepColumns.add( 302 column.getExpression().getGenericExpression()); 303 } 304 Pair<SegmentHeader, SegmentBody> rollupHeaderBody = 305 SegmentBuilder.rollup( 306 map, 307 keepColumns, 308 rollup.constrainedColumnsBitKey, 309 rollup.measure.getAggregator().getRollup(), 310 rollup.measure.getDatatype()); 311 312 final SegmentHeader header = rollupHeaderBody.left; 313 final SegmentBody body = rollupHeaderBody.right; 314 315 if (headerBodies.containsKey(header)) { 316 // We had already created this segment, somehow. 317 continue; 318 } 319 320 headerBodies.put(header, body); 321 succeededRollups.put(header, body); 322 323 final SegmentWithData segmentWithData = 324 response.convert(header, body); 325 326 // Register this segment with the local star. 327 segmentWithData.getStar().register(segmentWithData); 328 329 // Make sure that the cache manager knows about this new 330 // segment. First thing we do is to add it to the index. 331 // Then we insert the segment body into the SlotFuture. 332 // This has to be done on the SegmentCacheManager's 333 // Actor thread to ensure thread safety. 334 if (!MondrianProperties.instance().DisableCaching.get()) { 335 final Locus locus = Locus.peek(); 336 cacheMgr.execute( 337 new SegmentCacheManager.Command<Void>() { 338 public Void call() throws Exception { 339 SegmentCacheIndex index = 340 cacheMgr.getIndexRegistry() 341 .getIndex(segmentWithData.getStar()); 342 boolean added = index.add( 343 segmentWithData.getHeader(), 344 true, 345 response.converterMap.get( 346 SegmentCacheIndexImpl 347 .makeConverterKey( 348 segmentWithData.getHeader()))); 349 if (added) { 350 index.loadSucceeded( 351 segmentWithData.getHeader(), body); 352 } 353 return null; 354 } 355 public Locus getLocus() { 356 return locus; 357 } 358 }); 359 } 360 } 361 362 // Wait for SQL statements to end -- but only if there are no 363 // failures. 364 // 365 // If there are failures, and its the first iteration, it's more 366 // urgent that we create and execute a follow-up request. We will 367 // wait for the pending SQL statements at the end of that. 368 // 369 // If there are failures on later iterations, wait for SQL 370 // statements to end. The cache might be porous. SQL might be the 371 // only way to make progress. 372 sqlSegmentMapFutures.addAll(response.sqlSegmentMapFutures); 373 if (failureCount == 0 || iteration > 0) { 374 // Wait on segments being loaded by someone else. 375 for (Map.Entry<SegmentHeader, Future<SegmentBody>> entry 376 : response.futures.entrySet()) 377 { 378 final SegmentHeader header = entry.getKey(); 379 final Future<SegmentBody> bodyFuture = entry.getValue(); 380 final SegmentBody body = Util.safeGet( 381 bodyFuture, 382 "Waiting for someone else's segment to load via SQL"); 383 final SegmentWithData segmentWithData = 384 response.convert(header, body); 385 segmentWithData.getStar().register(segmentWithData); 386 } 387 388 // Wait on segments being loaded by SQL statements we asked for. 389 for (Future<Map<Segment, SegmentWithData>> sqlSegmentMapFuture 390 : sqlSegmentMapFutures) 391 { 392 final Map<Segment, SegmentWithData> segmentMap = 393 Util.safeGet( 394 sqlSegmentMapFuture, 395 "Waiting for segment to load via SQL"); 396 for (SegmentWithData segmentWithData : segmentMap.values()) 397 { 398 segmentWithData.getStar().register(segmentWithData); 399 } 400 // TODO: also pass back SegmentHeader and SegmentBody, 401 // and add these to headerBodies. Might help? 402 } 403 } 404 405 if (failureCount == 0) { 406 break; 407 } 408 409 // Figure out which cell requests are not satisfied by any of the 410 // segments retrieved. 411 @SuppressWarnings("unchecked") 412 List<CellRequest> old = new ArrayList<CellRequest>(cellRequests1); 413 cellRequests1.clear(); 414 for (CellRequest cellRequest : old) { 415 if (cellRequest.getMeasure().getStar() 416 .getCellFromCache(cellRequest, null) == null) 417 { 418 cellRequests1.add(cellRequest); 419 } 420 } 421 422 if (cellRequests1.isEmpty()) { 423 break; 424 } 425 426 if (cellRequests1.size() >= old.size() 427 && iteration > 10) 428 { 429 throw Util.newError( 430 "Cache round-trip did not resolve any cell requests. " 431 + "Iteration #" + iteration 432 + "; request count " + cellRequests1.size() 433 + "; requested headers: " + response.cacheSegments.size() 434 + "; requested rollups: " + response.rollups.size() 435 + "; requested SQL: " 436 + response.sqlSegmentMapFutures.size()); 437 } 438 439 // Continue loop; form and execute a new request with the smaller 440 // set of cell requests. 441 } 442 443 dirty = false; 444 cellRequests.clear(); 445 return true; 446 } 447 448 /** 449 * Finds a segment-list among a list of candidate segment-lists 450 * for which the bodies of all segments are in cache. Returns a map 451 * from segment-to-body if found, or null if not found. 452 * 453 * @param headerBodies Cache of bodies previously retrieved from external 454 * cache 455 * 456 * @param rollup Specifies what segments to roll up, and the 457 * target dimensionality 458 * 459 * @return Collection of segment headers and bodies suitable for rollup, 460 * or null 461 */ 462 private Map<SegmentHeader, SegmentBody> findResidentRollupCandidate( 463 Map<SegmentHeader, SegmentBody> headerBodies, 464 BatchLoader.RollupInfo rollup) 465 { 466 candidateLoop: 467 for (List<SegmentHeader> headers : rollup.candidateLists) { 468 final Map<SegmentHeader, SegmentBody> map = 469 new HashMap<SegmentHeader, SegmentBody>(); 470 for (SegmentHeader header : headers) { 471 SegmentBody body = loadSegmentFromCache(headerBodies, header); 472 if (body == null) { 473 // To proceed with a candidate, require all headers to 474 // be in cache. 475 continue candidateLoop; 476 } 477 map.put(header, body); 478 } 479 return map; 480 } 481 return null; 482 } 483 484 private SegmentBody loadSegmentFromCache( 485 Map<SegmentHeader, SegmentBody> headerBodies, 486 SegmentHeader header) 487 { 488 SegmentBody body = headerBodies.get(header); 489 if (body != null) { 490 return body; 491 } 492 body = cacheMgr.compositeCache.get(header); 493 if (body == null) { 494 if (cube.getStar() != null) { 495 cacheMgr.remove(cube.getStar(), header); 496 } 497 return null; 498 } 499 headerBodies.put(header, body); 500 return body; 501 } 502 503 /** 504 * Returns the SQL dialect. Overridden in some unit tests. 505 * 506 * @return Dialect 507 */ 508 Dialect getDialect() { 509 final RolapStar star = cube.getStar(); 510 if (star != null) { 511 return star.getSqlQueryDialect(); 512 } else { 513 return cube.getSchema().getDialect(); 514 } 515 } 516 517 /** 518 * Sets the flag indicating that the reader has told a lie. 519 */ 520 void setDirty(boolean dirty) { 521 this.dirty = dirty; 522 } 523 524} 525 526/** 527 * Context for processing a request to the cache manager for segments matching a 528 * collection of cell requests. All methods except the constructor are executed 529 * by the cache manager's dedicated thread. 530 */ 531class BatchLoader { 532 private static final Logger LOGGER = 533 Logger.getLogger(FastBatchingCellReader.class); 534 535 private final Locus locus; 536 private final SegmentCacheManager cacheMgr; 537 private final Dialect dialect; 538 private final RolapCube cube; 539 540 private final Map<AggregationKey, Batch> batches = 541 new HashMap<AggregationKey, Batch>(); 542 543 private final Set<SegmentHeader> cacheHeaders = 544 new LinkedHashSet<SegmentHeader>(); 545 546 private final Map<SegmentHeader, Future<SegmentBody>> futures = 547 new HashMap<SegmentHeader, Future<SegmentBody>>(); 548 549 private final List<RollupInfo> rollups = new ArrayList<RollupInfo>(); 550 551 private final Set<BitKey> rollupBitmaps = new HashSet<BitKey>(); 552 553 private final Map<List, SegmentBuilder.SegmentConverter> converterMap = 554 new HashMap<List, SegmentBuilder.SegmentConverter>(); 555 556 public BatchLoader( 557 Locus locus, 558 SegmentCacheManager cacheMgr, 559 Dialect dialect, 560 RolapCube cube) 561 { 562 this.locus = locus; 563 this.cacheMgr = cacheMgr; 564 this.dialect = dialect; 565 this.cube = cube; 566 } 567 568 final boolean shouldUseGroupingFunction() { 569 return MondrianProperties.instance().EnableGroupingSets.get() 570 && dialect.supportsGroupingSets(); 571 } 572 573 private void recordCellRequest2(final CellRequest request) { 574 // If there is a segment matching these criteria, write it to the list 575 // of found segments, and remove the cell request from the list. 576 final AggregationKey key = new AggregationKey(request); 577 578 final SegmentBuilder.SegmentConverterImpl converter = 579 new SegmentBuilder.SegmentConverterImpl(key, request); 580 581 boolean success = 582 loadFromCaches(request, key, converter); 583 // Skip the batch if we already have a rollup for it. 584 if (rollupBitmaps.contains(request.getConstrainedColumnsBitKey())) { 585 return; 586 } 587 588 // As a last resort, we load from SQL. 589 if (!success) { 590 loadFromSql(request, key, converter); 591 } 592 } 593 594 /** 595 * Loads a cell from caches. If the cell is successfully loaded, 596 * we return true. 597 */ 598 private boolean loadFromCaches( 599 final CellRequest request, 600 final AggregationKey key, 601 final SegmentBuilder.SegmentConverterImpl converter) 602 { 603 if (MondrianProperties.instance().DisableCaching.get()) { 604 // Caching is disabled. Return always false. 605 return false; 606 } 607 608 // Is request matched by one of the headers we intend to load? 609 final Map<String, Comparable> mappedCellValues = 610 request.getMappedCellValues(); 611 final List<String> compoundPredicates = 612 AggregationKey.getCompoundPredicateStringList( 613 key.getStar(), 614 key.getCompoundPredicateList()); 615 616 for (SegmentHeader header : cacheHeaders) { 617 if (SegmentCacheIndexImpl.matches( 618 header, 619 mappedCellValues, 620 compoundPredicates)) 621 { 622 // It's likely that the header will be in the cache, so this 623 // request will be satisfied. If not, the header will be removed 624 // from the segment index, and we'll be back. 625 return true; 626 } 627 } 628 final RolapStar.Measure measure = request.getMeasure(); 629 final RolapStar star = measure.getStar(); 630 final RolapSchema schema = star.getSchema(); 631 final SegmentCacheIndex index = 632 cacheMgr.getIndexRegistry().getIndex(star); 633 final List<SegmentHeader> headersInCache = 634 index.locate( 635 schema.getName(), 636 schema.getChecksum(), 637 measure.getCubeName(), 638 measure.getName(), 639 star.getFactTable().getAlias(), 640 request.getConstrainedColumnsBitKey(), 641 mappedCellValues, 642 compoundPredicates); 643 644 // Ask for the first segment to be loaded from cache. (If it's no longer 645 // in cache, we'll be back, and presumably we'll try the second 646 // segment.) 647 648 if (!headersInCache.isEmpty()) { 649 final SegmentHeader headerInCache = headersInCache.get(0); 650 651 final Future<SegmentBody> future = 652 index.getFuture(locus.execution, headerInCache); 653 654 if (future != null) { 655 // Segment header is in cache, body is being loaded. Worker will 656 // need to wait for load to complete. 657 futures.put(headerInCache, future); 658 } else { 659 // Segment is in cache. 660 cacheHeaders.add(headerInCache); 661 } 662 index.setConverter( 663 headerInCache.schemaName, 664 headerInCache.schemaChecksum, 665 headerInCache.cubeName, 666 headerInCache.rolapStarFactTableName, 667 headerInCache.measureName, 668 headerInCache.compoundPredicates, 669 converter); 670 converterMap.put( 671 SegmentCacheIndexImpl.makeConverterKey(request, key), 672 converter); 673 return true; 674 } 675 676 // Try to roll up if the measure's rollup aggregator supports 677 // "fast" aggregation from raw objects. 678 // 679 // Do not try to roll up if this request has already chosen a rollup 680 // with the same target dimensionality. It is quite likely that the 681 // other rollup will satisfy this request, and it's complicated to be 682 // 100% sure. If we're wrong, we'll be back. 683 684 // Also make sure that we don't try to rollup a measure which 685 // doesn't support rollup from raw data, like a distinct count 686 // for example. Both the measure's aggregator and its rollup 687 // aggregator must support raw data aggregation. We call 688 // Aggregator.supportsFastAggregates() to verify. 689 if (MondrianProperties.instance() 690 .EnableInMemoryRollup.get() 691 && measure.getAggregator().supportsFastAggregates( 692 measure.getDatatype()) 693 && measure.getAggregator().getRollup().supportsFastAggregates( 694 measure.getDatatype()) 695 && !isRequestCoveredByRollups(request)) 696 { 697 // Don't even bother doing a segment lookup if we can't 698 // rollup that measure. 699 final List<List<SegmentHeader>> rollup = 700 index.findRollupCandidates( 701 schema.getName(), 702 schema.getChecksum(), 703 measure.getCubeName(), 704 measure.getName(), 705 star.getFactTable().getAlias(), 706 request.getConstrainedColumnsBitKey(), 707 mappedCellValues, 708 AggregationKey.getCompoundPredicateStringList( 709 star, 710 key.getCompoundPredicateList())); 711 if (!rollup.isEmpty()) { 712 rollups.add( 713 new RollupInfo( 714 request, 715 rollup)); 716 rollupBitmaps.add(request.getConstrainedColumnsBitKey()); 717 converterMap.put( 718 SegmentCacheIndexImpl.makeConverterKey(request, key), 719 new SegmentBuilder.StarSegmentConverter( 720 measure, 721 key.getCompoundPredicateList())); 722 return true; 723 } 724 } 725 return false; 726 } 727 728 /** 729 * Checks if the request can be satisfied by a rollup already in place 730 * and moves that rollup to the top of the list if not there. 731 */ 732 private boolean isRequestCoveredByRollups(CellRequest request) { 733 BitKey bitKey = request.getConstrainedColumnsBitKey(); 734 if (!rollupBitmaps.contains(bitKey)) { 735 return false; 736 } 737 List<SegmentHeader> firstOkList = null; 738 for (RollupInfo rollupInfo : rollups) { 739 if (!rollupInfo.constrainedColumnsBitKey.equals(bitKey)) { 740 continue; 741 } 742 int candidateListsIdx = 0; 743 // bitkey is the same, are the constrained values compatible? 744 candidatesLoop: 745 for (List<SegmentHeader> candList 746 : rollupInfo.candidateLists) 747 { 748 for (SegmentHeader header : candList) { 749 if (headerCoversRequest(header, request)) { 750 firstOkList = candList; 751 break candidatesLoop; 752 } 753 } 754 candidateListsIdx++; 755 } 756 if (firstOkList != null) { 757 if (candidateListsIdx > 0) { 758 // move good candidate list to first position 759 rollupInfo.candidateLists.remove(candidateListsIdx); 760 rollupInfo.candidateLists.set(0, firstOkList); 761 } 762 return true; 763 } 764 } 765 return false; 766 } 767 768 /** 769 * Check constraint compatibility 770 */ 771 private boolean headerCoversRequest( 772 SegmentHeader header, 773 CellRequest request) 774 { 775 BitKey bitKey = request.getConstrainedColumnsBitKey(); 776 assert header.getConstrainedColumnsBitKey().cardinality() 777 >= bitKey.cardinality(); 778 BitKey headerBitKey = header.getConstrainedColumnsBitKey(); 779 // get all constrained values for relevant bitKey positions 780 List<SortedSet<Comparable>> headerValues = 781 new ArrayList<SortedSet<Comparable>>(bitKey.cardinality()); 782 Map<Integer, Integer> valueIndexes = new HashMap<Integer, Integer>(); 783 int relevantCCIdx = 0, keyValuesIdx = 0; 784 for (int bitPos : headerBitKey) { 785 if (bitKey.get(bitPos)) { 786 headerValues.add( 787 header.getConstrainedColumns().get(relevantCCIdx).values); 788 valueIndexes.put(bitPos, keyValuesIdx++); 789 } 790 relevantCCIdx++; 791 } 792 assert request.getConstrainedColumns().length 793 == request.getSingleValues().length; 794 // match header constraints against request values 795 for (int i = 0; i < request.getConstrainedColumns().length; i++) { 796 RolapStar.Column col = request.getConstrainedColumns()[i]; 797 Integer valueIdx = valueIndexes.get(col.getBitPosition()); 798 if (headerValues.get(valueIdx) != null 799 && !headerValues.get(valueIdx).contains( 800 request.getSingleValues()[i])) 801 { 802 return false; 803 } 804 } 805 return true; 806 } 807 808 private void loadFromSql( 809 final CellRequest request, 810 final AggregationKey key, 811 final SegmentBuilder.SegmentConverterImpl converter) 812 { 813 // Finally, add to a batch. It will turn in to a SQL request. 814 Batch batch = batches.get(key); 815 if (batch == null) { 816 batch = new Batch(request); 817 batches.put(key, batch); 818 converterMap.put( 819 SegmentCacheIndexImpl.makeConverterKey(request, key), 820 converter); 821 822 if (LOGGER.isDebugEnabled()) { 823 StringBuilder buf = new StringBuilder(100); 824 buf.append("FastBatchingCellReader: bitkey="); 825 buf.append(request.getConstrainedColumnsBitKey()); 826 buf.append(Util.nl); 827 828 for (RolapStar.Column column 829 : request.getConstrainedColumns()) 830 { 831 buf.append(" "); 832 buf.append(column); 833 buf.append(Util.nl); 834 } 835 LOGGER.debug(buf.toString()); 836 } 837 } 838 batch.add(request); 839 } 840 841 /** 842 * Determines which segments need to be loaded from external cache, 843 * created using roll up, or created using SQL to satisfy a given list 844 * of cell requests. 845 * 846 * @return List of segment futures. Each segment future may or may not be 847 * already present (it depends on the current location of the segment 848 * body). Each future will return a not-null segment (or throw). 849 */ 850 LoadBatchResponse load(List<CellRequest> cellRequests) { 851 // Check for cancel/timeout. The request might have been on the queue 852 // for a while. 853 if (locus.execution != null) { 854 locus.execution.checkCancelOrTimeout(); 855 } 856 857 final long t1 = System.currentTimeMillis(); 858 859 // Now we're inside the cache manager, we can see which of our cell 860 // requests can be answered from cache. Those that can will be added 861 // to the segments list; those that can not will be converted into 862 // batches and rolled up or loaded using SQL. 863 for (CellRequest cellRequest : cellRequests) { 864 recordCellRequest2(cellRequest); 865 } 866 867 // Sort the batches into deterministic order. 868 List<Batch> batchList = 869 new ArrayList<Batch>(batches.values()); 870 Collections.sort(batchList, BatchComparator.instance); 871 final List<Future<Map<Segment, SegmentWithData>>> segmentMapFutures = 872 new ArrayList<Future<Map<Segment, SegmentWithData>>>(); 873 if (shouldUseGroupingFunction()) { 874 LOGGER.debug("Using grouping sets"); 875 List<CompositeBatch> groupedBatches = groupBatches(batchList); 876 for (CompositeBatch batch : groupedBatches) { 877 batch.load(segmentMapFutures); 878 } 879 } else { 880 // Load batches in turn. 881 for (Batch batch : batchList) { 882 batch.loadAggregation(segmentMapFutures); 883 } 884 } 885 886 if (LOGGER.isDebugEnabled()) { 887 final long t2 = System.currentTimeMillis(); 888 LOGGER.debug("load (millis): " + (t2 - t1)); 889 } 890 891 // Create a response and return it to the client. The response is a 892 // bunch of work to be done (waiting for segments to load from SQL, to 893 // come from cache, and so forth) on the client's time. Some of the bets 894 // may not come off, in which case, the client will send us another 895 // request. 896 return new LoadBatchResponse( 897 cellRequests, 898 new ArrayList<SegmentHeader>(cacheHeaders), 899 rollups, 900 converterMap, 901 segmentMapFutures, 902 futures); 903 } 904 905 static List<CompositeBatch> groupBatches(List<Batch> batchList) { 906 Map<AggregationKey, CompositeBatch> batchGroups = 907 new HashMap<AggregationKey, CompositeBatch>(); 908 for (int i = 0; i < batchList.size(); i++) { 909 for (int j = i + 1; j < batchList.size();) { 910 final Batch iBatch = batchList.get(i); 911 final Batch jBatch = batchList.get(j); 912 if (iBatch.canBatch(jBatch)) { 913 batchList.remove(j); 914 addToCompositeBatch(batchGroups, iBatch, jBatch); 915 } else if (jBatch.canBatch(iBatch)) { 916 batchList.set(i, jBatch); 917 batchList.remove(j); 918 addToCompositeBatch(batchGroups, jBatch, iBatch); 919 j = i + 1; 920 } else { 921 j++; 922 } 923 } 924 } 925 926 wrapNonBatchedBatchesWithCompositeBatches(batchList, batchGroups); 927 final CompositeBatch[] compositeBatches = 928 batchGroups.values().toArray( 929 new CompositeBatch[batchGroups.size()]); 930 Arrays.sort(compositeBatches, CompositeBatchComparator.instance); 931 return Arrays.asList(compositeBatches); 932 } 933 934 private static void wrapNonBatchedBatchesWithCompositeBatches( 935 List<Batch> batchList, 936 Map<AggregationKey, CompositeBatch> batchGroups) 937 { 938 for (Batch batch : batchList) { 939 if (batchGroups.get(batch.batchKey) == null) { 940 batchGroups.put(batch.batchKey, new CompositeBatch(batch)); 941 } 942 } 943 } 944 945 static void addToCompositeBatch( 946 Map<AggregationKey, CompositeBatch> batchGroups, 947 Batch detailedBatch, 948 Batch summaryBatch) 949 { 950 CompositeBatch compositeBatch = batchGroups.get(detailedBatch.batchKey); 951 952 if (compositeBatch == null) { 953 compositeBatch = new CompositeBatch(detailedBatch); 954 batchGroups.put(detailedBatch.batchKey, compositeBatch); 955 } 956 957 CompositeBatch compositeBatchOfSummaryBatch = 958 batchGroups.remove(summaryBatch.batchKey); 959 960 if (compositeBatchOfSummaryBatch != null) { 961 compositeBatch.merge(compositeBatchOfSummaryBatch); 962 } else { 963 compositeBatch.add(summaryBatch); 964 } 965 } 966 967 /** 968 * Command that loads the segments required for a collection of cell 969 * requests. Returns the collection of segments. 970 */ 971 public static class LoadBatchCommand 972 implements SegmentCacheManager.Command<LoadBatchResponse> 973 { 974 private final Locus locus; 975 private final SegmentCacheManager cacheMgr; 976 private final Dialect dialect; 977 private final RolapCube cube; 978 private final List<CellRequest> cellRequests; 979 private final Map<String, Object> mdc = 980 new HashMap<String, Object>(); 981 982 public LoadBatchCommand( 983 Locus locus, 984 SegmentCacheManager cacheMgr, 985 Dialect dialect, 986 RolapCube cube, 987 List<CellRequest> cellRequests) 988 { 989 this.locus = locus; 990 this.cacheMgr = cacheMgr; 991 this.dialect = dialect; 992 this.cube = cube; 993 this.cellRequests = cellRequests; 994 if (MDC.getContext() != null) { 995 this.mdc.putAll(MDC.getContext()); 996 } 997 } 998 999 public LoadBatchResponse call() { 1000 if (MDC.getContext() != null) { 1001 final Map<String, Object> old = MDC.getContext(); 1002 old.clear(); 1003 old.putAll(mdc); 1004 } 1005 return new BatchLoader(locus, cacheMgr, dialect, cube) 1006 .load(cellRequests); 1007 } 1008 1009 public Locus getLocus() { 1010 return locus; 1011 } 1012 } 1013 1014 /** 1015 * Set of Batches which can grouped together. 1016 */ 1017 static class CompositeBatch { 1018 /** Batch with most number of constraint columns */ 1019 final Batch detailedBatch; 1020 1021 /** Batches whose data can be fetched using rollup on detailed batch */ 1022 final List<Batch> summaryBatches = new ArrayList<Batch>(); 1023 1024 CompositeBatch(Batch detailedBatch) { 1025 this.detailedBatch = detailedBatch; 1026 } 1027 1028 void add(Batch summaryBatch) { 1029 summaryBatches.add(summaryBatch); 1030 } 1031 1032 void merge(CompositeBatch summaryBatch) { 1033 summaryBatches.add(summaryBatch.detailedBatch); 1034 summaryBatches.addAll(summaryBatch.summaryBatches); 1035 } 1036 1037 public void load( 1038 List<Future<Map<Segment, SegmentWithData>>> segmentFutures) 1039 { 1040 GroupingSetsCollector batchCollector = 1041 new GroupingSetsCollector(true); 1042 this.detailedBatch.loadAggregation(batchCollector, segmentFutures); 1043 1044 int cellRequestCount = 0; 1045 for (Batch batch : summaryBatches) { 1046 batch.loadAggregation(batchCollector, segmentFutures); 1047 cellRequestCount += batch.cellRequestCount; 1048 } 1049 1050 getSegmentLoader().load( 1051 cellRequestCount, 1052 batchCollector.getGroupingSets(), 1053 detailedBatch.batchKey.getCompoundPredicateList(), 1054 segmentFutures); 1055 } 1056 1057 SegmentLoader getSegmentLoader() { 1058 return new SegmentLoader(detailedBatch.getCacheMgr()); 1059 } 1060 } 1061 1062 private static final Logger BATCH_LOGGER = Logger.getLogger(Batch.class); 1063 1064 public static class RollupInfo { 1065 final RolapStar.Column[] constrainedColumns; 1066 final BitKey constrainedColumnsBitKey; 1067 final RolapStar.Measure measure; 1068 final List<List<SegmentHeader>> candidateLists; 1069 1070 RollupInfo( 1071 CellRequest request, 1072 List<List<SegmentHeader>> candidateLists) 1073 { 1074 this.candidateLists = candidateLists; 1075 constrainedColumns = request.getConstrainedColumns(); 1076 constrainedColumnsBitKey = request.getConstrainedColumnsBitKey(); 1077 measure = request.getMeasure(); 1078 } 1079 } 1080 1081 /** 1082 * Request sent from cache manager to a worker to load segments into 1083 * the cache, create segments by rolling up, and to wait for segments 1084 * being loaded via SQL. 1085 */ 1086 static class LoadBatchResponse { 1087 /** 1088 * List of segments that are being loaded using SQL. 1089 * 1090 * <p>Other workers are executing the SQL. When done, they will write a 1091 * segment body or an error into the respective futures. The thread 1092 * processing this request will wait on those futures, once all segments 1093 * have successfully arrived from cache.</p> 1094 */ 1095 final List<Future<Map<Segment, SegmentWithData>>> sqlSegmentMapFutures; 1096 1097 /** 1098 * List of segments we are trying to load from the cache. 1099 */ 1100 final List<SegmentHeader> cacheSegments; 1101 1102 /** 1103 * List of cell requests that will be satisfied by segments we are 1104 * trying to load from the cache (or create by rolling up). 1105 */ 1106 final List<CellRequest> cellRequests; 1107 1108 /** 1109 * List of segments to be created from segments in the cache, provided 1110 * that the cache segments come through. 1111 * 1112 * <p>If they do not, we will need to tell the cache manager to remove 1113 * the pending segments.</p> 1114 */ 1115 final List<RollupInfo> rollups; 1116 1117 final Map<List, SegmentBuilder.SegmentConverter> converterMap; 1118 1119 final Map<SegmentHeader, Future<SegmentBody>> futures; 1120 1121 LoadBatchResponse( 1122 List<CellRequest> cellRequests, 1123 List<SegmentHeader> cacheSegments, 1124 List<RollupInfo> rollups, 1125 Map<List, SegmentBuilder.SegmentConverter> converterMap, 1126 List<Future<Map<Segment, SegmentWithData>>> sqlSegmentMapFutures, 1127 Map<SegmentHeader, Future<SegmentBody>> futures) 1128 { 1129 this.cellRequests = cellRequests; 1130 this.sqlSegmentMapFutures = sqlSegmentMapFutures; 1131 this.cacheSegments = cacheSegments; 1132 this.rollups = rollups; 1133 this.converterMap = converterMap; 1134 this.futures = futures; 1135 } 1136 1137 public SegmentWithData convert( 1138 SegmentHeader header, 1139 SegmentBody body) 1140 { 1141 final SegmentBuilder.SegmentConverter converter = 1142 converterMap.get( 1143 SegmentCacheIndexImpl.makeConverterKey(header)); 1144 return converter.convert(header, body); 1145 } 1146 } 1147 1148 public class Batch { 1149 // the CellRequest's constrained columns 1150 final RolapStar.Column[] columns; 1151 final List<RolapStar.Measure> measuresList = 1152 new ArrayList<RolapStar.Measure>(); 1153 final Set<StarColumnPredicate>[] valueSets; 1154 final AggregationKey batchKey; 1155 // string representation; for debug; set lazily in toString 1156 private String string; 1157 private int cellRequestCount; 1158 private List<StarColumnPredicate[]> tuples = 1159 new ArrayList<StarColumnPredicate[]>(); 1160 1161 public Batch(CellRequest request) { 1162 columns = request.getConstrainedColumns(); 1163 valueSets = new HashSet[columns.length]; 1164 for (int i = 0; i < valueSets.length; i++) { 1165 valueSets[i] = new HashSet<StarColumnPredicate>(); 1166 } 1167 batchKey = new AggregationKey(request); 1168 } 1169 1170 public String toString() { 1171 if (string == null) { 1172 final StringBuilder buf = new StringBuilder(); 1173 buf.append("Batch {\n") 1174 .append(" columns={").append(Arrays.toString(columns)) 1175 .append("}\n") 1176 .append(" measures={").append(measuresList).append("}\n") 1177 .append(" valueSets={").append(Arrays.toString(valueSets)) 1178 .append("}\n") 1179 .append(" batchKey=").append(batchKey).append("}\n") 1180 .append("}"); 1181 string = buf.toString(); 1182 } 1183 return string; 1184 } 1185 1186 public final void add(CellRequest request) { 1187 ++cellRequestCount; 1188 final int valueCount = request.getNumValues(); 1189 final StarColumnPredicate[] tuple = 1190 new StarColumnPredicate[valueCount]; 1191 for (int j = 0; j < valueCount; j++) { 1192 final StarColumnPredicate value = request.getValueAt(j); 1193 valueSets[j].add(value); 1194 tuple[j] = value; 1195 } 1196 tuples.add(tuple); 1197 final RolapStar.Measure measure = request.getMeasure(); 1198 if (!measuresList.contains(measure)) { 1199 assert (measuresList.size() == 0) 1200 || (measure.getStar() 1201 == (measuresList.get(0)).getStar()) 1202 : "Measure must belong to same star as other measures"; 1203 measuresList.add(measure); 1204 } 1205 } 1206 1207 /** 1208 * Returns the RolapStar associated with the Batch's first Measure. 1209 * 1210 * <p>This method can only be called after the {@link #add} method has 1211 * been called. 1212 * 1213 * @return the RolapStar associated with the Batch's first Measure 1214 */ 1215 private RolapStar getStar() { 1216 RolapStar.Measure measure = measuresList.get(0); 1217 return measure.getStar(); 1218 } 1219 1220 public BitKey getConstrainedColumnsBitKey() { 1221 return batchKey.getConstrainedColumnsBitKey(); 1222 } 1223 1224 public SegmentCacheManager getCacheMgr() { 1225 return cacheMgr; 1226 } 1227 1228 public final void loadAggregation( 1229 List<Future<Map<Segment, SegmentWithData>>> segmentFutures) 1230 { 1231 GroupingSetsCollector collectorWithGroupingSetsTurnedOff = 1232 new GroupingSetsCollector(false); 1233 loadAggregation(collectorWithGroupingSetsTurnedOff, segmentFutures); 1234 } 1235 1236 final void loadAggregation( 1237 GroupingSetsCollector groupingSetsCollector, 1238 List<Future<Map<Segment, SegmentWithData>>> segmentFutures) 1239 { 1240 if (MondrianProperties.instance().GenerateAggregateSql.get()) { 1241 generateAggregateSql(); 1242 } 1243 final StarColumnPredicate[] predicates = initPredicates(); 1244 final long t1 = System.currentTimeMillis(); 1245 1246 // TODO: optimize key sets; drop a constraint if more than x% of 1247 // the members are requested; whether we should get just the cells 1248 // requested or expand to a n-cube 1249 1250 // If the database cannot execute "count(distinct ...)", split the 1251 // distinct aggregations out. 1252 int distinctMeasureCount = getDistinctMeasureCount(measuresList); 1253 boolean tooManyDistinctMeasures = 1254 distinctMeasureCount > 0 1255 && !dialect.allowsCountDistinct() 1256 || distinctMeasureCount > 1 1257 && !dialect.allowsMultipleCountDistinct(); 1258 1259 if (tooManyDistinctMeasures) { 1260 doSpecialHandlingOfDistinctCountMeasures( 1261 predicates, 1262 groupingSetsCollector, 1263 segmentFutures); 1264 } 1265 1266 // Load agg(distinct <SQL expression>) measures individually 1267 // for DBs that does allow multiple distinct SQL measures. 1268 if (!dialect.allowsMultipleDistinctSqlMeasures()) { 1269 // Note that the intention was originally to capture the 1270 // subquery SQL measures and separate them out; However, 1271 // without parsing the SQL string, Mondrian cannot distinguish 1272 // between "col1" + "col2" and subquery. Here the measure list 1273 // contains both types. 1274 1275 // See the test case testLoadDistinctSqlMeasure() in 1276 // mondrian.rolap.FastBatchingCellReaderTest 1277 1278 List<RolapStar.Measure> distinctSqlMeasureList = 1279 getDistinctSqlMeasures(measuresList); 1280 for (RolapStar.Measure measure : distinctSqlMeasureList) { 1281 AggregationManager.loadAggregation( 1282 cacheMgr, 1283 cellRequestCount, 1284 Collections.singletonList(measure), 1285 columns, 1286 batchKey, 1287 predicates, 1288 groupingSetsCollector, 1289 segmentFutures); 1290 measuresList.remove(measure); 1291 } 1292 } 1293 1294 final int measureCount = measuresList.size(); 1295 if (measureCount > 0) { 1296 AggregationManager.loadAggregation( 1297 cacheMgr, 1298 cellRequestCount, 1299 measuresList, 1300 columns, 1301 batchKey, 1302 predicates, 1303 groupingSetsCollector, 1304 segmentFutures); 1305 } 1306 1307 if (BATCH_LOGGER.isDebugEnabled()) { 1308 final long t2 = System.currentTimeMillis(); 1309 BATCH_LOGGER.debug( 1310 "Batch.load (millis) " + (t2 - t1)); 1311 } 1312 } 1313 1314 private void doSpecialHandlingOfDistinctCountMeasures( 1315 StarColumnPredicate[] predicates, 1316 GroupingSetsCollector groupingSetsCollector, 1317 List<Future<Map<Segment, SegmentWithData>>> segmentFutures) 1318 { 1319 while (true) { 1320 // Scan for a measure based upon a distinct aggregation. 1321 final RolapStar.Measure distinctMeasure = 1322 getFirstDistinctMeasure(measuresList); 1323 if (distinctMeasure == null) { 1324 break; 1325 } 1326 final String expr = 1327 distinctMeasure.getExpression().getGenericExpression(); 1328 final List<RolapStar.Measure> distinctMeasuresList = 1329 new ArrayList<RolapStar.Measure>(); 1330 for (int i = 0; i < measuresList.size();) { 1331 final RolapStar.Measure measure = measuresList.get(i); 1332 if (measure.getAggregator().isDistinct() 1333 && measure.getExpression().getGenericExpression() 1334 .equals(expr)) 1335 { 1336 measuresList.remove(i); 1337 distinctMeasuresList.add(distinctMeasure); 1338 } else { 1339 i++; 1340 } 1341 } 1342 1343 // Load all the distinct measures based on the same expression 1344 // together 1345 AggregationManager.loadAggregation( 1346 cacheMgr, 1347 cellRequestCount, 1348 distinctMeasuresList, 1349 columns, 1350 batchKey, 1351 predicates, 1352 groupingSetsCollector, 1353 segmentFutures); 1354 } 1355 } 1356 1357 private StarColumnPredicate[] initPredicates() { 1358 StarColumnPredicate[] predicates = 1359 new StarColumnPredicate[columns.length]; 1360 for (int j = 0; j < columns.length; j++) { 1361 Set<StarColumnPredicate> valueSet = valueSets[j]; 1362 1363 StarColumnPredicate predicate; 1364 if (valueSet == null) { 1365 predicate = LiteralStarPredicate.FALSE; 1366 } else { 1367 ValueColumnPredicate[] values = 1368 valueSet.toArray( 1369 new ValueColumnPredicate[valueSet.size()]); 1370 // Sort array to achieve determinism in generated SQL. 1371 Arrays.sort( 1372 values, 1373 ValueColumnConstraintComparator.instance); 1374 1375 predicate = 1376 new ListColumnPredicate( 1377 columns[j], 1378 Arrays.asList((StarColumnPredicate[]) values)); 1379 } 1380 1381 predicates[j] = predicate; 1382 } 1383 return predicates; 1384 } 1385 1386 private void generateAggregateSql() { 1387 if (cube == null || cube.isVirtual()) { 1388 final StringBuilder buf = new StringBuilder(64); 1389 buf.append( 1390 "AggGen: Sorry, can not create SQL for virtual Cube \"") 1391 .append(cube == null ? null : cube.getName()) 1392 .append("\", operation not currently supported"); 1393 BATCH_LOGGER.error(buf.toString()); 1394 1395 } else { 1396 final AggGen aggGen = 1397 new AggGen(cube.getName(), cube.getStar(), columns); 1398 if (aggGen.isReady()) { 1399 // PRINT TO STDOUT - DO NOT USE BATCH_LOGGER 1400 System.out.println( 1401 "createLost:" + Util.nl + aggGen.createLost()); 1402 System.out.println( 1403 "insertIntoLost:" + Util.nl + aggGen.insertIntoLost()); 1404 System.out.println( 1405 "createCollapsed:" + Util.nl 1406 + aggGen.createCollapsed()); 1407 System.out.println( 1408 "insertIntoCollapsed:" + Util.nl 1409 + aggGen.insertIntoCollapsed()); 1410 } else { 1411 BATCH_LOGGER.error("AggGen failed"); 1412 } 1413 } 1414 } 1415 1416 /** 1417 * Returns the first measure based upon a distinct aggregation, or null 1418 * if there is none. 1419 */ 1420 final RolapStar.Measure getFirstDistinctMeasure( 1421 List<RolapStar.Measure> measuresList) 1422 { 1423 for (RolapStar.Measure measure : measuresList) { 1424 if (measure.getAggregator().isDistinct()) { 1425 return measure; 1426 } 1427 } 1428 return null; 1429 } 1430 1431 /** 1432 * Returns the number of the measures based upon a distinct 1433 * aggregation. 1434 */ 1435 private int getDistinctMeasureCount( 1436 List<RolapStar.Measure> measuresList) 1437 { 1438 int count = 0; 1439 for (RolapStar.Measure measure : measuresList) { 1440 if (measure.getAggregator().isDistinct()) { 1441 ++count; 1442 } 1443 } 1444 return count; 1445 } 1446 1447 /** 1448 * Returns the list of measures based upon a distinct aggregation 1449 * containing SQL measure expressions(as opposed to column expressions). 1450 * 1451 * This method was initially intended for only those measures that are 1452 * defined using subqueries(for DBs that support them). However, since 1453 * Mondrian does not parse the SQL string, the method will count both 1454 * queries as well as some non query SQL expressions. 1455 */ 1456 private List<RolapStar.Measure> getDistinctSqlMeasures( 1457 List<RolapStar.Measure> measuresList) 1458 { 1459 List<RolapStar.Measure> distinctSqlMeasureList = 1460 new ArrayList<RolapStar.Measure>(); 1461 for (RolapStar.Measure measure : measuresList) { 1462 if (measure.getAggregator().isDistinct() 1463 && measure.getExpression() instanceof 1464 MondrianDef.MeasureExpression) 1465 { 1466 MondrianDef.MeasureExpression measureExpr = 1467 (MondrianDef.MeasureExpression) measure.getExpression(); 1468 MondrianDef.SQL measureSql = measureExpr.expressions[0]; 1469 // Checks if the SQL contains "SELECT" to detect the case a 1470 // subquery is used to define the measure. This is not a 1471 // perfect check, because a SQL expression on column names 1472 // containing "SELECT" will also be detected. e,g, 1473 // count("select beef" + "regular beef"). 1474 if (measureSql.cdata.toUpperCase().contains("SELECT")) { 1475 distinctSqlMeasureList.add(measure); 1476 } 1477 } 1478 } 1479 return distinctSqlMeasureList; 1480 } 1481 1482 /** 1483 * Returns whether another Batch can be batched to this Batch. 1484 * 1485 * <p>This is possible if: 1486 * <li>columns list is super set of other batch's constraint columns; 1487 * and 1488 * <li>both have same Fact Table; and 1489 * <li>matching columns of this and other batch has the same value; and 1490 * <li>non matching columns of this batch have ALL VALUES 1491 * </ul> 1492 */ 1493 boolean canBatch(Batch other) { 1494 return hasOverlappingBitKeys(other) 1495 && constraintsMatch(other) 1496 && hasSameMeasureList(other) 1497 && !hasDistinctCountMeasure() 1498 && !other.hasDistinctCountMeasure() 1499 && haveSameStarAndAggregation(other) 1500 && haveSameClosureColumns(other); 1501 } 1502 1503 /** 1504 * Returns whether the constraints on this Batch subsume the constraints 1505 * on another Batch and therefore the other Batch can be subsumed into 1506 * this one for GROUPING SETS purposes. Not symmetric. 1507 * 1508 * @param other Other batch 1509 * @return Whether other batch can be subsumed into this one 1510 */ 1511 private boolean constraintsMatch(Batch other) { 1512 if (areBothDistinctCountBatches(other)) { 1513 if (getConstrainedColumnsBitKey().equals( 1514 other.getConstrainedColumnsBitKey())) 1515 { 1516 return hasSameCompoundPredicate(other) 1517 && haveSameValues(other); 1518 } else { 1519 return hasSameCompoundPredicate(other) 1520 || (other.batchKey.getCompoundPredicateList().isEmpty() 1521 || equalConstraint( 1522 batchKey.getCompoundPredicateList(), 1523 other.batchKey.getCompoundPredicateList())) 1524 && haveSameValues(other); 1525 } 1526 } else { 1527 return haveSameValues(other); 1528 } 1529 } 1530 1531 private boolean equalConstraint( 1532 List<StarPredicate> predList1, 1533 List<StarPredicate> predList2) 1534 { 1535 if (predList1.size() != predList2.size()) { 1536 return false; 1537 } 1538 for (int i = 0; i < predList1.size(); i++) { 1539 StarPredicate pred1 = predList1.get(i); 1540 StarPredicate pred2 = predList2.get(i); 1541 if (!pred1.equalConstraint(pred2)) { 1542 return false; 1543 } 1544 } 1545 return true; 1546 } 1547 1548 private boolean areBothDistinctCountBatches(Batch other) { 1549 return this.hasDistinctCountMeasure() 1550 && !this.hasNormalMeasures() 1551 && other.hasDistinctCountMeasure() 1552 && !other.hasNormalMeasures(); 1553 } 1554 1555 private boolean hasNormalMeasures() { 1556 return getDistinctMeasureCount(measuresList) 1557 != measuresList.size(); 1558 } 1559 1560 private boolean hasSameMeasureList(Batch other) { 1561 return this.measuresList.size() == other.measuresList.size() 1562 && this.measuresList.containsAll(other.measuresList); 1563 } 1564 1565 boolean hasOverlappingBitKeys(Batch other) { 1566 return getConstrainedColumnsBitKey() 1567 .isSuperSetOf(other.getConstrainedColumnsBitKey()); 1568 } 1569 1570 boolean hasDistinctCountMeasure() { 1571 return getDistinctMeasureCount(measuresList) > 0; 1572 } 1573 1574 boolean hasSameCompoundPredicate(Batch other) { 1575 final StarPredicate starPredicate = compoundPredicate(); 1576 final StarPredicate otherStarPredicate = other.compoundPredicate(); 1577 if (starPredicate == null && otherStarPredicate == null) { 1578 return true; 1579 } else if (starPredicate != null && otherStarPredicate != null) { 1580 return starPredicate.equalConstraint(otherStarPredicate); 1581 } 1582 return false; 1583 } 1584 1585 private StarPredicate compoundPredicate() { 1586 StarPredicate predicate = null; 1587 for (Set<StarColumnPredicate> valueSet : valueSets) { 1588 StarPredicate orPredicate = null; 1589 for (StarColumnPredicate starColumnPredicate : valueSet) { 1590 if (orPredicate == null) { 1591 orPredicate = starColumnPredicate; 1592 } else { 1593 orPredicate = orPredicate.or(starColumnPredicate); 1594 } 1595 } 1596 if (predicate == null) { 1597 predicate = orPredicate; 1598 } else { 1599 predicate = predicate.and(orPredicate); 1600 } 1601 } 1602 for (StarPredicate starPredicate 1603 : batchKey.getCompoundPredicateList()) 1604 { 1605 if (predicate == null) { 1606 predicate = starPredicate; 1607 } else { 1608 predicate = predicate.and(starPredicate); 1609 } 1610 } 1611 return predicate; 1612 } 1613 1614 boolean haveSameStarAndAggregation(Batch other) { 1615 boolean rollup[] = {false}; 1616 boolean otherRollup[] = {false}; 1617 1618 boolean hasSameAggregation = 1619 getAgg(rollup) == other.getAgg(otherRollup); 1620 boolean hasSameRollupOption = rollup[0] == otherRollup[0]; 1621 1622 boolean hasSameStar = getStar().equals(other.getStar()); 1623 return hasSameStar && hasSameAggregation && hasSameRollupOption; 1624 } 1625 1626 /** 1627 * Returns whether this batch has the same closure columns as another. 1628 * 1629 * <p>Ensures that we do not group together a batch that includes a 1630 * level of a parent-child closure dimension with a batch that does not. 1631 * It is not safe to roll up from a parent-child closure level; due to 1632 * multiple accounting, the 'all' level is less than the sum of the 1633 * members of the closure level. 1634 * 1635 * @param other Other batch 1636 * @return Whether batches have the same closure columns 1637 */ 1638 boolean haveSameClosureColumns(Batch other) { 1639 final BitKey cubeClosureColumnBitKey = cube.closureColumnBitKey; 1640 if (cubeClosureColumnBitKey == null) { 1641 // Virtual cubes have a null bitkey. For now, punt; should do 1642 // better. 1643 return true; 1644 } 1645 final BitKey closureColumns = 1646 this.batchKey.getConstrainedColumnsBitKey() 1647 .and(cubeClosureColumnBitKey); 1648 final BitKey otherClosureColumns = 1649 other.batchKey.getConstrainedColumnsBitKey() 1650 .and(cubeClosureColumnBitKey); 1651 return closureColumns.equals(otherClosureColumns); 1652 } 1653 1654 /** 1655 * @param rollup Out parameter 1656 * @return AggStar 1657 */ 1658 private AggStar getAgg(boolean[] rollup) { 1659 return AggregationManager.findAgg( 1660 getStar(), 1661 getConstrainedColumnsBitKey(), 1662 makeMeasureBitKey(), 1663 rollup); 1664 } 1665 1666 private BitKey makeMeasureBitKey() { 1667 BitKey bitKey = getConstrainedColumnsBitKey().emptyCopy(); 1668 for (RolapStar.Measure measure : measuresList) { 1669 bitKey.set(measure.getBitPosition()); 1670 } 1671 return bitKey; 1672 } 1673 1674 /** 1675 * Return whether have same values for overlapping columns or 1676 * has all children for others. 1677 */ 1678 boolean haveSameValues( 1679 Batch other) 1680 { 1681 for (int j = 0; j < columns.length; j++) { 1682 boolean isCommonColumn = false; 1683 for (int i = 0; i < other.columns.length; i++) { 1684 if (areSameColumns(other.columns[i], columns[j])) { 1685 if (hasSameValues(other.valueSets[i], valueSets[j])) { 1686 isCommonColumn = true; 1687 break; 1688 } else { 1689 return false; 1690 } 1691 } 1692 } 1693 if (!isCommonColumn 1694 && !hasAllValues(columns[j], valueSets[j])) 1695 { 1696 return false; 1697 } 1698 } 1699 return true; 1700 } 1701 1702 private boolean hasAllValues( 1703 RolapStar.Column column, 1704 Set<StarColumnPredicate> valueSet) 1705 { 1706 return column.getCardinality() == valueSet.size(); 1707 } 1708 1709 private boolean areSameColumns( 1710 RolapStar.Column otherColumn, 1711 RolapStar.Column thisColumn) 1712 { 1713 return otherColumn.equals(thisColumn); 1714 } 1715 1716 private boolean hasSameValues( 1717 Set<StarColumnPredicate> otherValueSet, 1718 Set<StarColumnPredicate> thisValueSet) 1719 { 1720 return otherValueSet.equals(thisValueSet); 1721 } 1722 } 1723 1724 private static class CompositeBatchComparator 1725 implements Comparator<CompositeBatch> 1726 { 1727 static final CompositeBatchComparator instance = 1728 new CompositeBatchComparator(); 1729 1730 public int compare(CompositeBatch o1, CompositeBatch o2) { 1731 return BatchComparator.instance.compare( 1732 o1.detailedBatch, 1733 o2.detailedBatch); 1734 } 1735 } 1736 1737 private static class BatchComparator implements Comparator<Batch> { 1738 static final BatchComparator instance = new BatchComparator(); 1739 1740 private BatchComparator() { 1741 } 1742 1743 public int compare( 1744 Batch o1, Batch o2) 1745 { 1746 if (o1.columns.length != o2.columns.length) { 1747 return o1.columns.length - o2.columns.length; 1748 } 1749 for (int i = 0; i < o1.columns.length; i++) { 1750 int c = o1.columns[i].getName().compareTo( 1751 o2.columns[i].getName()); 1752 if (c != 0) { 1753 return c; 1754 } 1755 } 1756 for (int i = 0; i < o1.columns.length; i++) { 1757 int c = compare(o1.valueSets[i], o2.valueSets[i]); 1758 if (c != 0) { 1759 return c; 1760 } 1761 } 1762 return 0; 1763 } 1764 1765 <T> int compare(Set<T> set1, Set<T> set2) { 1766 if (set1.size() != set2.size()) { 1767 return set1.size() - set2.size(); 1768 } 1769 Iterator<T> iter1 = set1.iterator(); 1770 Iterator<T> iter2 = set2.iterator(); 1771 while (iter1.hasNext()) { 1772 T v1 = iter1.next(); 1773 T v2 = iter2.next(); 1774 int c = Util.compareKey(v1, v2); 1775 if (c != 0) { 1776 return c; 1777 } 1778 } 1779 return 0; 1780 } 1781 } 1782 1783 private static class ValueColumnConstraintComparator 1784 implements Comparator<ValueColumnPredicate> 1785 { 1786 static final ValueColumnConstraintComparator instance = 1787 new ValueColumnConstraintComparator(); 1788 1789 private ValueColumnConstraintComparator() { 1790 } 1791 1792 public int compare( 1793 ValueColumnPredicate o1, 1794 ValueColumnPredicate o2) 1795 { 1796 Object v1 = o1.getValue(); 1797 Object v2 = o2.getValue(); 1798 if (v1.getClass() == v2.getClass() 1799 && v1 instanceof Comparable) 1800 { 1801 return ((Comparable) v1).compareTo(v2); 1802 } else { 1803 return v1.toString().compareTo(v2.toString()); 1804 } 1805 } 1806 } 1807 1808} 1809 1810// End FastBatchingCellReader.java