001/*
002// This software is subject to the terms of the Eclipse Public License v1.0
003// Agreement, available at the following URL:
004// http://www.eclipse.org/legal/epl-v10.html.
005// You must accept the terms of that agreement to use this software.
006//
007// Copyright (C) 2001-2005 Julian Hyde
008// Copyright (C) 2005-2013 Pentaho and others
009// All Rights Reserved.
010//
011// jhyde, 30 August, 2001
012*/
013package mondrian.rolap.agg;
014
015import mondrian.olap.CacheControl;
016import mondrian.olap.Exp;
017import mondrian.olap.MondrianProperties;
018import mondrian.olap.MondrianServer;
019import mondrian.olap.Util;
020import mondrian.rolap.*;
021import mondrian.rolap.SqlStatement.Type;
022import mondrian.rolap.aggmatcher.AggStar;
023import mondrian.server.Locus;
024import mondrian.util.Pair;
025
026import org.apache.log4j.Logger;
027
028import java.io.PrintWriter;
029import java.util.*;
030import java.util.concurrent.*;
031
032/**
033 * <code>RolapAggregationManager</code> manages all {@link Aggregation}s
034 * in the system. It is a singleton class.
035 *
036 * @author jhyde
037 * @since 30 August, 2001
038 */
039public class AggregationManager extends RolapAggregationManager {
040
041    private static final MondrianProperties properties =
042        MondrianProperties.instance();
043
044    private static final Logger LOGGER =
045        Logger.getLogger(AggregationManager.class);
046
047    public final SegmentCacheManager cacheMgr;
048
049    /**
050     * Creates the AggregationManager.
051     */
052    public AggregationManager(MondrianServer server) {
053        if (properties.EnableCacheHitCounters.get()) {
054            LOGGER.error(
055                "Property " + properties.EnableCacheHitCounters.getPath()
056                + " is obsolete; ignored.");
057        }
058        this.cacheMgr = new SegmentCacheManager(server);
059    }
060
061    /**
062     * Returns the log4j logger.
063     *
064     * @return Logger
065     */
066    public final Logger getLogger() {
067        return LOGGER;
068    }
069
070    /**
071     * Returns or creates the singleton.
072     *
073     * @deprecated No longer a singleton, and will be removed in mondrian-4.
074     *   Use {@link mondrian.olap.MondrianServer#getAggregationManager()}.
075     *   To get a server, call
076     *   {@link mondrian.olap.MondrianServer#forConnection(mondrian.olap.Connection)},
077     *   passing in a null connection if you absolutely must.
078     */
079    public static synchronized AggregationManager instance() {
080        return
081            MondrianServer.forId(null).getAggregationManager();
082    }
083
084    /**
085     * Called by FastBatchingCellReader.load where the
086     * RolapStar creates an Aggregation if needed.
087     *
088     * @param cacheMgr Cache manager
089     * @param cellRequestCount Number of missed cells that led to this request
090     * @param measures Measures to load
091     * @param columns this is the CellRequest's constrained columns
092     * @param aggregationKey this is the CellRequest's constraint key
093     * @param predicates Array of constraints on each column
094     * @param groupingSetsCollector grouping sets collector
095     * @param segmentFutures List of futures into which each statement will
096     *     place a list of the segments it has loaded, when it completes
097     */
098    public static void loadAggregation(
099        SegmentCacheManager cacheMgr,
100        int cellRequestCount,
101        List<RolapStar.Measure> measures,
102        RolapStar.Column[] columns,
103        AggregationKey aggregationKey,
104        StarColumnPredicate[] predicates,
105        GroupingSetsCollector groupingSetsCollector,
106        List<Future<Map<Segment, SegmentWithData>>> segmentFutures)
107    {
108        RolapStar star = measures.get(0).getStar();
109        Aggregation aggregation =
110            star.lookupOrCreateAggregation(aggregationKey);
111
112        // try to eliminate unnecessary constraints
113        // for Oracle: prevent an IN-clause with more than 1000 elements
114        predicates = aggregation.optimizePredicates(columns, predicates);
115        aggregation.load(
116            cacheMgr, cellRequestCount, columns, measures, predicates,
117            groupingSetsCollector, segmentFutures);
118    }
119
120    /**
121     * Returns an API with which to explicitly manage the contents of the cache.
122     *
123     * @param connection Server whose cache to control
124     * @param pw Print writer, for tracing
125     * @return CacheControl API
126     */
127    public CacheControl getCacheControl(
128        RolapConnection connection,
129        final PrintWriter pw)
130    {
131        return new CacheControlImpl(connection) {
132            protected void flushNonUnion(final CellRegion region) {
133                final SegmentCacheManager.FlushResult result =
134                    cacheMgr.execute(
135                        new SegmentCacheManager.FlushCommand(
136                            Locus.peek(),
137                            cacheMgr,
138                            region,
139                            this));
140                final List<Future<Boolean>> futures =
141                    new ArrayList<Future<Boolean>>();
142                for (Callable<Boolean> task : result.tasks) {
143                    futures.add(cacheMgr.cacheExecutor.submit(task));
144                }
145                for (Future<Boolean> future : futures) {
146                    Util.discard(Util.safeGet(future, "Flush cache"));
147                }
148            }
149
150            public void flush(final CellRegion region) {
151                if (pw != null) {
152                    pw.println("Cache state before flush:");
153                    printCacheState(pw, region);
154                    pw.println();
155                }
156                super.flush(region);
157                if (pw != null) {
158                    pw.println("Cache state after flush:");
159                    printCacheState(pw, region);
160                    pw.println();
161                }
162            }
163
164            public void trace(final String message) {
165                if (pw != null) {
166                    pw.println(message);
167                }
168            }
169
170            public boolean isTraceEnabled() {
171                return pw != null;
172            }
173        };
174    }
175
176    public Object getCellFromCache(CellRequest request) {
177        return getCellFromCache(request, null);
178    }
179
180    public Object getCellFromCache(CellRequest request, PinSet pinSet) {
181        // NOTE: This method used to check both local (thread/statement) cache
182        // and global cache (segments in JVM, shared between statements). Now it
183        // only looks in local cache. This can be done without acquiring any
184        // locks, because the local cache is thread-local. If a segment that
185        // matches this cell-request in global cache, a call to
186        // SegmentCacheManager will copy it into local cache.
187        final RolapStar.Measure measure = request.getMeasure();
188        return measure.getStar().getCellFromCache(request, pinSet);
189    }
190
191    public Object getCellFromAllCaches(CellRequest request) {
192        final RolapStar.Measure measure = request.getMeasure();
193        return measure.getStar().getCellFromAllCaches(request);
194    }
195
196    public String getDrillThroughSql(
197        final DrillThroughCellRequest request,
198        final StarPredicate starPredicateSlicer,
199        List<Exp> fields,
200        final boolean countOnly)
201    {
202        DrillThroughQuerySpec spec =
203            new DrillThroughQuerySpec(
204                request,
205                starPredicateSlicer,
206                countOnly);
207        Pair<String, List<SqlStatement.Type>> pair = spec.generateSqlQuery();
208
209        if (getLogger().isDebugEnabled()) {
210            getLogger().debug(
211                "DrillThroughSQL: "
212                + pair.left
213                + Util.nl);
214        }
215
216        return pair.left;
217    }
218
219    /**
220     * Generates the query to retrieve the cells for a list of segments.
221     * Called by Segment.load.
222     *
223     * @return A pair consisting of a SQL statement and a list of suggested
224     *     types of columns
225     */
226    public static Pair<String, List<SqlStatement.Type>> generateSql(
227        GroupingSetsList groupingSetsList,
228        List<StarPredicate> compoundPredicateList)
229    {
230        final RolapStar star = groupingSetsList.getStar();
231        BitKey levelBitKey = groupingSetsList.getDefaultLevelBitKey();
232        BitKey measureBitKey = groupingSetsList.getDefaultMeasureBitKey();
233
234        // Check if using aggregates is enabled.
235        boolean hasCompoundPredicates = false;
236        if (compoundPredicateList != null && compoundPredicateList.size() > 0) {
237            // Do not use Aggregate tables if compound predicates are present.
238            hasCompoundPredicates = true;
239        }
240        if (MondrianProperties.instance().UseAggregates.get()
241             && !hasCompoundPredicates)
242        {
243            final boolean[] rollup = {false};
244            AggStar aggStar = findAgg(star, levelBitKey, measureBitKey, rollup);
245
246            if (aggStar != null) {
247                // Got a match, hot damn
248
249                if (LOGGER.isDebugEnabled()) {
250                    StringBuilder buf = new StringBuilder(256);
251                    buf.append("MATCH: ");
252                    buf.append(star.getFactTable().getAlias());
253                    buf.append(Util.nl);
254                    buf.append("   foreign=");
255                    buf.append(levelBitKey);
256                    buf.append(Util.nl);
257                    buf.append("   measure=");
258                    buf.append(measureBitKey);
259                    buf.append(Util.nl);
260                    buf.append("   aggstar=");
261                    buf.append(aggStar.getBitKey());
262                    buf.append(Util.nl);
263                    buf.append("AggStar=");
264                    buf.append(aggStar.getFactTable().getName());
265                    buf.append(Util.nl);
266                    for (AggStar.Table.Column column
267                        : aggStar.getFactTable().getColumns())
268                    {
269                        buf.append("   ");
270                        buf.append(column);
271                        buf.append(Util.nl);
272                    }
273                    LOGGER.debug(buf.toString());
274                }
275
276                AggQuerySpec aggQuerySpec =
277                    new AggQuerySpec(
278                        aggStar, rollup[0], groupingSetsList);
279                Pair<String, List<Type>> sql = aggQuerySpec.generateSqlQuery();
280
281                if (LOGGER.isDebugEnabled()) {
282                    LOGGER.debug(
283                        "generateSqlQuery: sql="
284                        + sql.left);
285                }
286
287                return sql;
288            }
289
290            // No match, fall through and use fact table.
291        }
292
293        if (LOGGER.isDebugEnabled()) {
294            StringBuilder sb = new StringBuilder();
295            sb.append("NO MATCH : ");
296            sb.append(star.getFactTable().getAlias());
297            sb.append(Util.nl);
298            sb.append("Foreign columns bit key=");
299            sb.append(levelBitKey);
300            sb.append(Util.nl);
301            sb.append("Measure bit key=        ");
302            sb.append(measureBitKey);
303            sb.append(Util.nl);
304            sb.append("Agg Stars=[");
305            sb.append(Util.nl);
306            for (AggStar aggStar : star.getAggStars()) {
307                sb.append(aggStar.toString());
308            }
309            sb.append(Util.nl);
310            sb.append("]");
311            LOGGER.debug(sb.toString());
312        }
313
314
315        // Fact table query
316        SegmentArrayQuerySpec spec =
317            new SegmentArrayQuerySpec(groupingSetsList, compoundPredicateList);
318
319        Pair<String, List<SqlStatement.Type>> pair = spec.generateSqlQuery();
320
321        if (LOGGER.isDebugEnabled()) {
322            LOGGER.debug(
323                "generateSqlQuery: sql=" + pair.left);
324        }
325
326        return pair;
327    }
328
329    /**
330     * Finds an aggregate table in the given star which has the desired levels
331     * and measures. Returns null if no aggregate table is suitable.
332     *
333     * <p>If there no aggregate is an exact match, returns a more
334     * granular aggregate which can be rolled up, and sets rollup to true.
335     * If one or more of the measures are distinct-count measures
336     * rollup is possible only in limited circumstances.
337     *
338     * @param star Star
339     * @param levelBitKey Set of levels
340     * @param measureBitKey Set of measures
341     * @param rollup Out parameter, is set to true if the aggregate is not
342     *   an exact match
343     * @return An aggregate, or null if none is suitable.
344     */
345    public static AggStar findAgg(
346        RolapStar star,
347        final BitKey levelBitKey,
348        final BitKey measureBitKey,
349        boolean[] rollup)
350    {
351        // If there is no distinct count measure, isDistinct == false,
352        // then all we want is an AggStar whose BitKey is a superset
353        // of the combined measure BitKey and foreign-key/level BitKey.
354        //
355        // On the other hand, if there is at least one distinct count
356        // measure, isDistinct == true, then what is wanted is an AggStar
357        // whose measure BitKey is a superset of the measure BitKey,
358        // whose level BitKey is an exact match and the aggregate table
359        // can NOT have any foreign keys.
360        assert rollup != null;
361        BitKey fullBitKey = levelBitKey.or(measureBitKey);
362
363        // The AggStars are already ordered from smallest to largest so
364        // we need only find the first one and return it.
365        for (AggStar aggStar : star.getAggStars()) {
366            // superset match
367            if (!aggStar.superSetMatch(fullBitKey)) {
368                continue;
369            }
370            boolean isDistinct = measureBitKey.intersects(
371                aggStar.getDistinctMeasureBitKey());
372
373            // The AggStar has no "distinct count" measures so
374            // we can use it without looking any further.
375            if (!isDistinct) {
376                // Need to use SUM if the query levels don't match
377                // the agg stars levels, or if the agg star is not
378                // fully collapsed.
379                rollup[0] = !aggStar.isFullyCollapsed()
380                    || (levelBitKey.isEmpty()
381                    || !aggStar.getLevelBitKey().equals(levelBitKey));
382                return aggStar;
383            }
384
385            // If there are distinct measures, we can only rollup in limited
386            // circumstances.
387
388            // No foreign keys (except when its used as a distinct count
389            //   measure).
390            // Level key exact match.
391            // Measure superset match.
392
393            // Compute the core levels -- those which can be safely
394            // rolled up to. For example,
395            // if the measure is 'distinct customer count',
396            // and the agg table has levels customer_id,
397            // then gender is a core level.
398            final BitKey distinctMeasuresBitKey =
399                measureBitKey.and(aggStar.getDistinctMeasureBitKey());
400            final BitSet distinctMeasures = distinctMeasuresBitKey.toBitSet();
401            BitKey combinedLevelBitKey = null;
402            for (int k = distinctMeasures.nextSetBit(0); k >= 0;
403                k = distinctMeasures.nextSetBit(k + 1))
404            {
405                final AggStar.FactTable.Measure distinctMeasure =
406                    aggStar.lookupMeasure(k);
407                BitKey rollableLevelBitKey =
408                    distinctMeasure.getRollableLevelBitKey();
409                if (combinedLevelBitKey == null) {
410                    combinedLevelBitKey = rollableLevelBitKey;
411                } else {
412                    // TODO use '&=' to remove unnecessary copy
413                    combinedLevelBitKey =
414                        combinedLevelBitKey.and(rollableLevelBitKey);
415                }
416            }
417
418            if (aggStar.hasForeignKeys()) {
419/*
420                    StringBuilder buf = new StringBuilder(256);
421                    buf.append("");
422                    buf.append(star.getFactTable().getAlias());
423                    buf.append(Util.nl);
424                    buf.append("foreign =");
425                    buf.append(levelBitKey);
426                    buf.append(Util.nl);
427                    buf.append("measure =");
428                    buf.append(measureBitKey);
429                    buf.append(Util.nl);
430                    buf.append("aggstar =");
431                    buf.append(aggStar.getBitKey());
432                    buf.append(Util.nl);
433                    buf.append("distinct=");
434                    buf.append(aggStar.getDistinctMeasureBitKey());
435                    buf.append(Util.nl);
436                    buf.append("AggStar=");
437                    buf.append(aggStar.getFactTable().getName());
438                    buf.append(Util.nl);
439                    for (Iterator columnIter =
440                            aggStar.getFactTable().getColumns().iterator();
441                         columnIter.hasNext();) {
442                        AggStar.Table.Column column =
443                                (AggStar.Table.Column) columnIter.next();
444                        buf.append("   ");
445                        buf.append(column);
446                        buf.append(Util.nl);
447                    }
448System.out.println(buf.toString());
449*/
450                // This is a little pessimistic. If the measure is
451                // 'count(distinct customer_id)' and one of the foreign keys is
452                // 'customer_id' then it is OK to roll up.
453
454                // Some of the measures in this query are distinct count.
455                // Get all of the foreign key columns.
456                // For each such measure, is it based upon a foreign key.
457                // Are there any foreign keys left over. No, can use AggStar.
458                BitKey fkBitKey = aggStar.getForeignKeyBitKey().copy();
459                for (AggStar.FactTable.Measure measure
460                    : aggStar.getFactTable().getMeasures())
461                {
462                    if (measure.isDistinct()) {
463                        if (measureBitKey.get(measure.getBitPosition())) {
464                            fkBitKey.clear(measure.getBitPosition());
465                        }
466                    }
467                }
468                if (!fkBitKey.isEmpty()) {
469                    // there are foreign keys left so we can not use this
470                    // AggStar.
471                    continue;
472                }
473            }
474
475            if (!aggStar.select(
476                    levelBitKey, combinedLevelBitKey, measureBitKey))
477            {
478                continue;
479            }
480
481            if (levelBitKey.isEmpty()) {
482                // We won't be able to resolve a distinct count measure like
483                // this. We need to resolve the distinct values but we don't
484                // have any levels for which we constraint on. This would
485                // result in either a bloated value (non-distinct) or
486                // only the first (non-rolled-up) to be returned.
487                continue;
488            }
489            rollup[0] = !aggStar.getLevelBitKey().equals(levelBitKey);
490            return aggStar;
491        }
492        return null;
493    }
494
495    public PinSet createPinSet() {
496        return new PinSetImpl();
497    }
498
499    public void shutdown() {
500        // Send a shutdown command and wait for it to return.
501        cacheMgr.shutdown();
502        // Now we can cleanup.
503        for (SegmentCacheWorker worker : cacheMgr.segmentCacheWorkers) {
504            worker.shutdown();
505        }
506    }
507
508    /**
509     * Implementation of {@link mondrian.rolap.RolapAggregationManager.PinSet}
510     * using a {@link HashSet}.
511     */
512    public static class PinSetImpl
513        extends HashSet<Segment>
514        implements RolapAggregationManager.PinSet
515    {
516    }
517}
518
519// End AggregationManager.java