001/* 002// This software is subject to the terms of the Eclipse Public License v1.0 003// Agreement, available at the following URL: 004// http://www.eclipse.org/legal/epl-v10.html. 005// You must accept the terms of that agreement to use this software. 006// 007// Copyright (C) 2001-2005 Julian Hyde 008// Copyright (C) 2005-2013 Pentaho and others 009// All Rights Reserved. 010// 011// jhyde, 30 August, 2001 012*/ 013package mondrian.rolap.agg; 014 015import mondrian.olap.CacheControl; 016import mondrian.olap.Exp; 017import mondrian.olap.MondrianProperties; 018import mondrian.olap.MondrianServer; 019import mondrian.olap.Util; 020import mondrian.rolap.*; 021import mondrian.rolap.SqlStatement.Type; 022import mondrian.rolap.aggmatcher.AggStar; 023import mondrian.server.Locus; 024import mondrian.util.Pair; 025 026import org.apache.log4j.Logger; 027 028import java.io.PrintWriter; 029import java.util.*; 030import java.util.concurrent.*; 031 032/** 033 * <code>RolapAggregationManager</code> manages all {@link Aggregation}s 034 * in the system. It is a singleton class. 035 * 036 * @author jhyde 037 * @since 30 August, 2001 038 */ 039public class AggregationManager extends RolapAggregationManager { 040 041 private static final MondrianProperties properties = 042 MondrianProperties.instance(); 043 044 private static final Logger LOGGER = 045 Logger.getLogger(AggregationManager.class); 046 047 public final SegmentCacheManager cacheMgr; 048 049 /** 050 * Creates the AggregationManager. 051 */ 052 public AggregationManager(MondrianServer server) { 053 if (properties.EnableCacheHitCounters.get()) { 054 LOGGER.error( 055 "Property " + properties.EnableCacheHitCounters.getPath() 056 + " is obsolete; ignored."); 057 } 058 this.cacheMgr = new SegmentCacheManager(server); 059 } 060 061 /** 062 * Returns the log4j logger. 063 * 064 * @return Logger 065 */ 066 public final Logger getLogger() { 067 return LOGGER; 068 } 069 070 /** 071 * Returns or creates the singleton. 072 * 073 * @deprecated No longer a singleton, and will be removed in mondrian-4. 074 * Use {@link mondrian.olap.MondrianServer#getAggregationManager()}. 075 * To get a server, call 076 * {@link mondrian.olap.MondrianServer#forConnection(mondrian.olap.Connection)}, 077 * passing in a null connection if you absolutely must. 078 */ 079 public static synchronized AggregationManager instance() { 080 return 081 MondrianServer.forId(null).getAggregationManager(); 082 } 083 084 /** 085 * Called by FastBatchingCellReader.load where the 086 * RolapStar creates an Aggregation if needed. 087 * 088 * @param cacheMgr Cache manager 089 * @param cellRequestCount Number of missed cells that led to this request 090 * @param measures Measures to load 091 * @param columns this is the CellRequest's constrained columns 092 * @param aggregationKey this is the CellRequest's constraint key 093 * @param predicates Array of constraints on each column 094 * @param groupingSetsCollector grouping sets collector 095 * @param segmentFutures List of futures into which each statement will 096 * place a list of the segments it has loaded, when it completes 097 */ 098 public static void loadAggregation( 099 SegmentCacheManager cacheMgr, 100 int cellRequestCount, 101 List<RolapStar.Measure> measures, 102 RolapStar.Column[] columns, 103 AggregationKey aggregationKey, 104 StarColumnPredicate[] predicates, 105 GroupingSetsCollector groupingSetsCollector, 106 List<Future<Map<Segment, SegmentWithData>>> segmentFutures) 107 { 108 RolapStar star = measures.get(0).getStar(); 109 Aggregation aggregation = 110 star.lookupOrCreateAggregation(aggregationKey); 111 112 // try to eliminate unnecessary constraints 113 // for Oracle: prevent an IN-clause with more than 1000 elements 114 predicates = aggregation.optimizePredicates(columns, predicates); 115 aggregation.load( 116 cacheMgr, cellRequestCount, columns, measures, predicates, 117 groupingSetsCollector, segmentFutures); 118 } 119 120 /** 121 * Returns an API with which to explicitly manage the contents of the cache. 122 * 123 * @param connection Server whose cache to control 124 * @param pw Print writer, for tracing 125 * @return CacheControl API 126 */ 127 public CacheControl getCacheControl( 128 RolapConnection connection, 129 final PrintWriter pw) 130 { 131 return new CacheControlImpl(connection) { 132 protected void flushNonUnion(final CellRegion region) { 133 final SegmentCacheManager.FlushResult result = 134 cacheMgr.execute( 135 new SegmentCacheManager.FlushCommand( 136 Locus.peek(), 137 cacheMgr, 138 region, 139 this)); 140 final List<Future<Boolean>> futures = 141 new ArrayList<Future<Boolean>>(); 142 for (Callable<Boolean> task : result.tasks) { 143 futures.add(cacheMgr.cacheExecutor.submit(task)); 144 } 145 for (Future<Boolean> future : futures) { 146 Util.discard(Util.safeGet(future, "Flush cache")); 147 } 148 } 149 150 public void flush(final CellRegion region) { 151 if (pw != null) { 152 pw.println("Cache state before flush:"); 153 printCacheState(pw, region); 154 pw.println(); 155 } 156 super.flush(region); 157 if (pw != null) { 158 pw.println("Cache state after flush:"); 159 printCacheState(pw, region); 160 pw.println(); 161 } 162 } 163 164 public void trace(final String message) { 165 if (pw != null) { 166 pw.println(message); 167 } 168 } 169 170 public boolean isTraceEnabled() { 171 return pw != null; 172 } 173 }; 174 } 175 176 public Object getCellFromCache(CellRequest request) { 177 return getCellFromCache(request, null); 178 } 179 180 public Object getCellFromCache(CellRequest request, PinSet pinSet) { 181 // NOTE: This method used to check both local (thread/statement) cache 182 // and global cache (segments in JVM, shared between statements). Now it 183 // only looks in local cache. This can be done without acquiring any 184 // locks, because the local cache is thread-local. If a segment that 185 // matches this cell-request in global cache, a call to 186 // SegmentCacheManager will copy it into local cache. 187 final RolapStar.Measure measure = request.getMeasure(); 188 return measure.getStar().getCellFromCache(request, pinSet); 189 } 190 191 public Object getCellFromAllCaches(CellRequest request) { 192 final RolapStar.Measure measure = request.getMeasure(); 193 return measure.getStar().getCellFromAllCaches(request); 194 } 195 196 public String getDrillThroughSql( 197 final DrillThroughCellRequest request, 198 final StarPredicate starPredicateSlicer, 199 List<Exp> fields, 200 final boolean countOnly) 201 { 202 DrillThroughQuerySpec spec = 203 new DrillThroughQuerySpec( 204 request, 205 starPredicateSlicer, 206 countOnly); 207 Pair<String, List<SqlStatement.Type>> pair = spec.generateSqlQuery(); 208 209 if (getLogger().isDebugEnabled()) { 210 getLogger().debug( 211 "DrillThroughSQL: " 212 + pair.left 213 + Util.nl); 214 } 215 216 return pair.left; 217 } 218 219 /** 220 * Generates the query to retrieve the cells for a list of segments. 221 * Called by Segment.load. 222 * 223 * @return A pair consisting of a SQL statement and a list of suggested 224 * types of columns 225 */ 226 public static Pair<String, List<SqlStatement.Type>> generateSql( 227 GroupingSetsList groupingSetsList, 228 List<StarPredicate> compoundPredicateList) 229 { 230 final RolapStar star = groupingSetsList.getStar(); 231 BitKey levelBitKey = groupingSetsList.getDefaultLevelBitKey(); 232 BitKey measureBitKey = groupingSetsList.getDefaultMeasureBitKey(); 233 234 // Check if using aggregates is enabled. 235 boolean hasCompoundPredicates = false; 236 if (compoundPredicateList != null && compoundPredicateList.size() > 0) { 237 // Do not use Aggregate tables if compound predicates are present. 238 hasCompoundPredicates = true; 239 } 240 if (MondrianProperties.instance().UseAggregates.get() 241 && !hasCompoundPredicates) 242 { 243 final boolean[] rollup = {false}; 244 AggStar aggStar = findAgg(star, levelBitKey, measureBitKey, rollup); 245 246 if (aggStar != null) { 247 // Got a match, hot damn 248 249 if (LOGGER.isDebugEnabled()) { 250 StringBuilder buf = new StringBuilder(256); 251 buf.append("MATCH: "); 252 buf.append(star.getFactTable().getAlias()); 253 buf.append(Util.nl); 254 buf.append(" foreign="); 255 buf.append(levelBitKey); 256 buf.append(Util.nl); 257 buf.append(" measure="); 258 buf.append(measureBitKey); 259 buf.append(Util.nl); 260 buf.append(" aggstar="); 261 buf.append(aggStar.getBitKey()); 262 buf.append(Util.nl); 263 buf.append("AggStar="); 264 buf.append(aggStar.getFactTable().getName()); 265 buf.append(Util.nl); 266 for (AggStar.Table.Column column 267 : aggStar.getFactTable().getColumns()) 268 { 269 buf.append(" "); 270 buf.append(column); 271 buf.append(Util.nl); 272 } 273 LOGGER.debug(buf.toString()); 274 } 275 276 AggQuerySpec aggQuerySpec = 277 new AggQuerySpec( 278 aggStar, rollup[0], groupingSetsList); 279 Pair<String, List<Type>> sql = aggQuerySpec.generateSqlQuery(); 280 281 if (LOGGER.isDebugEnabled()) { 282 LOGGER.debug( 283 "generateSqlQuery: sql=" 284 + sql.left); 285 } 286 287 return sql; 288 } 289 290 // No match, fall through and use fact table. 291 } 292 293 if (LOGGER.isDebugEnabled()) { 294 StringBuilder sb = new StringBuilder(); 295 sb.append("NO MATCH : "); 296 sb.append(star.getFactTable().getAlias()); 297 sb.append(Util.nl); 298 sb.append("Foreign columns bit key="); 299 sb.append(levelBitKey); 300 sb.append(Util.nl); 301 sb.append("Measure bit key= "); 302 sb.append(measureBitKey); 303 sb.append(Util.nl); 304 sb.append("Agg Stars=["); 305 sb.append(Util.nl); 306 for (AggStar aggStar : star.getAggStars()) { 307 sb.append(aggStar.toString()); 308 } 309 sb.append(Util.nl); 310 sb.append("]"); 311 LOGGER.debug(sb.toString()); 312 } 313 314 315 // Fact table query 316 SegmentArrayQuerySpec spec = 317 new SegmentArrayQuerySpec(groupingSetsList, compoundPredicateList); 318 319 Pair<String, List<SqlStatement.Type>> pair = spec.generateSqlQuery(); 320 321 if (LOGGER.isDebugEnabled()) { 322 LOGGER.debug( 323 "generateSqlQuery: sql=" + pair.left); 324 } 325 326 return pair; 327 } 328 329 /** 330 * Finds an aggregate table in the given star which has the desired levels 331 * and measures. Returns null if no aggregate table is suitable. 332 * 333 * <p>If there no aggregate is an exact match, returns a more 334 * granular aggregate which can be rolled up, and sets rollup to true. 335 * If one or more of the measures are distinct-count measures 336 * rollup is possible only in limited circumstances. 337 * 338 * @param star Star 339 * @param levelBitKey Set of levels 340 * @param measureBitKey Set of measures 341 * @param rollup Out parameter, is set to true if the aggregate is not 342 * an exact match 343 * @return An aggregate, or null if none is suitable. 344 */ 345 public static AggStar findAgg( 346 RolapStar star, 347 final BitKey levelBitKey, 348 final BitKey measureBitKey, 349 boolean[] rollup) 350 { 351 // If there is no distinct count measure, isDistinct == false, 352 // then all we want is an AggStar whose BitKey is a superset 353 // of the combined measure BitKey and foreign-key/level BitKey. 354 // 355 // On the other hand, if there is at least one distinct count 356 // measure, isDistinct == true, then what is wanted is an AggStar 357 // whose measure BitKey is a superset of the measure BitKey, 358 // whose level BitKey is an exact match and the aggregate table 359 // can NOT have any foreign keys. 360 assert rollup != null; 361 BitKey fullBitKey = levelBitKey.or(measureBitKey); 362 363 // The AggStars are already ordered from smallest to largest so 364 // we need only find the first one and return it. 365 for (AggStar aggStar : star.getAggStars()) { 366 // superset match 367 if (!aggStar.superSetMatch(fullBitKey)) { 368 continue; 369 } 370 boolean isDistinct = measureBitKey.intersects( 371 aggStar.getDistinctMeasureBitKey()); 372 373 // The AggStar has no "distinct count" measures so 374 // we can use it without looking any further. 375 if (!isDistinct) { 376 // Need to use SUM if the query levels don't match 377 // the agg stars levels, or if the agg star is not 378 // fully collapsed. 379 rollup[0] = !aggStar.isFullyCollapsed() 380 || (levelBitKey.isEmpty() 381 || !aggStar.getLevelBitKey().equals(levelBitKey)); 382 return aggStar; 383 } 384 385 // If there are distinct measures, we can only rollup in limited 386 // circumstances. 387 388 // No foreign keys (except when its used as a distinct count 389 // measure). 390 // Level key exact match. 391 // Measure superset match. 392 393 // Compute the core levels -- those which can be safely 394 // rolled up to. For example, 395 // if the measure is 'distinct customer count', 396 // and the agg table has levels customer_id, 397 // then gender is a core level. 398 final BitKey distinctMeasuresBitKey = 399 measureBitKey.and(aggStar.getDistinctMeasureBitKey()); 400 final BitSet distinctMeasures = distinctMeasuresBitKey.toBitSet(); 401 BitKey combinedLevelBitKey = null; 402 for (int k = distinctMeasures.nextSetBit(0); k >= 0; 403 k = distinctMeasures.nextSetBit(k + 1)) 404 { 405 final AggStar.FactTable.Measure distinctMeasure = 406 aggStar.lookupMeasure(k); 407 BitKey rollableLevelBitKey = 408 distinctMeasure.getRollableLevelBitKey(); 409 if (combinedLevelBitKey == null) { 410 combinedLevelBitKey = rollableLevelBitKey; 411 } else { 412 // TODO use '&=' to remove unnecessary copy 413 combinedLevelBitKey = 414 combinedLevelBitKey.and(rollableLevelBitKey); 415 } 416 } 417 418 if (aggStar.hasForeignKeys()) { 419/* 420 StringBuilder buf = new StringBuilder(256); 421 buf.append(""); 422 buf.append(star.getFactTable().getAlias()); 423 buf.append(Util.nl); 424 buf.append("foreign ="); 425 buf.append(levelBitKey); 426 buf.append(Util.nl); 427 buf.append("measure ="); 428 buf.append(measureBitKey); 429 buf.append(Util.nl); 430 buf.append("aggstar ="); 431 buf.append(aggStar.getBitKey()); 432 buf.append(Util.nl); 433 buf.append("distinct="); 434 buf.append(aggStar.getDistinctMeasureBitKey()); 435 buf.append(Util.nl); 436 buf.append("AggStar="); 437 buf.append(aggStar.getFactTable().getName()); 438 buf.append(Util.nl); 439 for (Iterator columnIter = 440 aggStar.getFactTable().getColumns().iterator(); 441 columnIter.hasNext();) { 442 AggStar.Table.Column column = 443 (AggStar.Table.Column) columnIter.next(); 444 buf.append(" "); 445 buf.append(column); 446 buf.append(Util.nl); 447 } 448System.out.println(buf.toString()); 449*/ 450 // This is a little pessimistic. If the measure is 451 // 'count(distinct customer_id)' and one of the foreign keys is 452 // 'customer_id' then it is OK to roll up. 453 454 // Some of the measures in this query are distinct count. 455 // Get all of the foreign key columns. 456 // For each such measure, is it based upon a foreign key. 457 // Are there any foreign keys left over. No, can use AggStar. 458 BitKey fkBitKey = aggStar.getForeignKeyBitKey().copy(); 459 for (AggStar.FactTable.Measure measure 460 : aggStar.getFactTable().getMeasures()) 461 { 462 if (measure.isDistinct()) { 463 if (measureBitKey.get(measure.getBitPosition())) { 464 fkBitKey.clear(measure.getBitPosition()); 465 } 466 } 467 } 468 if (!fkBitKey.isEmpty()) { 469 // there are foreign keys left so we can not use this 470 // AggStar. 471 continue; 472 } 473 } 474 475 if (!aggStar.select( 476 levelBitKey, combinedLevelBitKey, measureBitKey)) 477 { 478 continue; 479 } 480 481 if (levelBitKey.isEmpty()) { 482 // We won't be able to resolve a distinct count measure like 483 // this. We need to resolve the distinct values but we don't 484 // have any levels for which we constraint on. This would 485 // result in either a bloated value (non-distinct) or 486 // only the first (non-rolled-up) to be returned. 487 continue; 488 } 489 rollup[0] = !aggStar.getLevelBitKey().equals(levelBitKey); 490 return aggStar; 491 } 492 return null; 493 } 494 495 public PinSet createPinSet() { 496 return new PinSetImpl(); 497 } 498 499 public void shutdown() { 500 // Send a shutdown command and wait for it to return. 501 cacheMgr.shutdown(); 502 // Now we can cleanup. 503 for (SegmentCacheWorker worker : cacheMgr.segmentCacheWorkers) { 504 worker.shutdown(); 505 } 506 } 507 508 /** 509 * Implementation of {@link mondrian.rolap.RolapAggregationManager.PinSet} 510 * using a {@link HashSet}. 511 */ 512 public static class PinSetImpl 513 extends HashSet<Segment> 514 implements RolapAggregationManager.PinSet 515 { 516 } 517} 518 519// End AggregationManager.java