001/*
002// This software is subject to the terms of the Eclipse Public License v1.0
003// Agreement, available at the following URL:
004// http://www.eclipse.org/legal/epl-v10.html.
005// You must accept the terms of that agreement to use this software.
006//
007// Copyright (C) 2004-2005 Julian Hyde
008// Copyright (C) 2005-2013 Pentaho and others
009// All Rights Reserved.
010*/
011package mondrian.rolap;
012
013import mondrian.olap.*;
014import mondrian.rolap.agg.*;
015import mondrian.rolap.aggmatcher.AggGen;
016import mondrian.rolap.aggmatcher.AggStar;
017import mondrian.rolap.cache.SegmentCacheIndex;
018import mondrian.rolap.cache.SegmentCacheIndexImpl;
019import mondrian.server.Execution;
020import mondrian.server.Locus;
021import mondrian.spi.*;
022import mondrian.util.*;
023
024import org.apache.log4j.Logger;
025import org.apache.log4j.MDC;
026
027import java.util.*;
028import java.util.concurrent.Future;
029
030/**
031 * A <code>FastBatchingCellReader</code> doesn't really Read cells: when asked
032 * to look up the values of stored measures, it lies, and records the fact
033 * that the value was asked for.  Later, we can look over the values which
034 * are required, fetch them in an efficient way, and re-run the evaluation
035 * with a real evaluator.
036 *
037 * <p>NOTE: When it doesn't know the answer, it lies by returning an error
038 * object.  The calling code must be able to deal with that.</p>
039 *
040 * <p>This class tries to minimize the amount of storage needed to record the
041 * fact that a cell was requested.</p>
042 */
043public class FastBatchingCellReader implements CellReader {
044    private static final Logger LOGGER =
045        Logger.getLogger(FastBatchingCellReader.class);
046
047    private final int cellRequestLimit;
048
049    private final RolapCube cube;
050
051    /**
052     * Records the number of requests. The field is used for correctness: if
053     * the request count stays the same during an operation, you know that the
054     * FastBatchingCellReader has not told any lies during that operation, and
055     * therefore the result is true. The field is also useful for debugging.
056     */
057    private int missCount;
058
059    /**
060     * Number of occasions that a requested cell was already in cache.
061     */
062    private int hitCount;
063
064    /**
065     * Number of occasions that requested cell was in the process of being
066     * loaded into cache but not ready.
067     */
068    private int pendingCount;
069
070    private final AggregationManager aggMgr;
071
072    private final boolean cacheEnabled;
073
074    private final SegmentCacheManager cacheMgr;
075
076    private final RolapAggregationManager.PinSet pinnedSegments;
077
078    /**
079     * Indicates that the reader has given incorrect results.
080     */
081    private boolean dirty;
082
083    private final List<CellRequest> cellRequests = new ArrayList<CellRequest>();
084
085    private final Execution execution;
086
087    /**
088     * Creates a FastBatchingCellReader.
089     *
090     * @param execution Execution that calling statement belongs to. Allows us
091     *                  to check for cancel
092     * @param cube      Cube that requests belong to
093     * @param aggMgr    Aggregation manager
094     */
095    public FastBatchingCellReader(
096        Execution execution,
097        RolapCube cube,
098        AggregationManager aggMgr)
099    {
100        this.execution = execution;
101        assert cube != null;
102        assert execution != null;
103        this.cube = cube;
104        this.aggMgr = aggMgr;
105        cacheMgr = aggMgr.cacheMgr;
106        pinnedSegments = this.aggMgr.createPinSet();
107        cacheEnabled = !MondrianProperties.instance().DisableCaching.get();
108
109        cellRequestLimit =
110            MondrianProperties.instance().CellBatchSize.get() <= 0
111                ? 100000 // TODO Make this logic into a pluggable algorithm.
112                : MondrianProperties.instance().CellBatchSize.get();
113    }
114
115    public Object get(RolapEvaluator evaluator) {
116        final CellRequest request =
117            RolapAggregationManager.makeRequest(evaluator);
118
119        if (request == null || request.isUnsatisfiable()) {
120            return Util.nullValue; // request not satisfiable.
121        }
122
123        // Try to retrieve a cell and simultaneously pin the segment which
124        // contains it.
125        final Object o = aggMgr.getCellFromCache(request, pinnedSegments);
126
127        assert o != Boolean.TRUE : "getCellFromCache no longer returns TRUE";
128        if (o != null) {
129            ++hitCount;
130            return o;
131        }
132
133        // If this query has not had any cache misses, it's worth doing a
134        // synchronous request for the cell segment. If it is in the cache, it
135        // will be worth the wait, because we can avoid the effort of batching
136        // up requests that could have been satisfied by the same segment.
137        if (cacheEnabled
138            && missCount == 0)
139        {
140            SegmentWithData segmentWithData = cacheMgr.peek(request);
141            if (segmentWithData != null) {
142                segmentWithData.getStar().register(segmentWithData);
143                final Object o2 =
144                    aggMgr.getCellFromCache(request, pinnedSegments);
145                if (o2 != null) {
146                    ++hitCount;
147                    return o2;
148                }
149            }
150        }
151
152        // if there is no such cell, record that we need to fetch it, and
153        // return 'error'
154        recordCellRequest(request);
155        return RolapUtil.valueNotReadyException;
156    }
157
158    public int getMissCount() {
159        return missCount;
160    }
161
162    public int getHitCount() {
163        return hitCount;
164    }
165
166    public int getPendingCount() {
167        return pendingCount;
168    }
169
170    public final void recordCellRequest(CellRequest request) {
171        assert !request.isUnsatisfiable();
172        ++missCount;
173        cellRequests.add(request);
174        if (cellRequests.size() % cellRequestLimit == 0) {
175            // Signal that it's time to ask the cache manager if it has cells
176            // we need in the cache. Not really an exception.
177            throw CellRequestQuantumExceededException.INSTANCE;
178        }
179    }
180
181    /**
182     * Returns whether this reader has told a lie. This is the case if there
183     * are pending batches to load or if {@link #setDirty(boolean)} has been
184     * called.
185     */
186    public boolean isDirty() {
187        return dirty || !cellRequests.isEmpty();
188    }
189
190    /**
191     * Resolves any pending cell reads using the cache. After calling this
192     * method, all cells requested in a given batch are loaded into this
193     * statement's local cache.
194     *
195     * <p>The method is implemented by making an asynchronous call to the cache
196     * manager. The result is a list of segments that satisfies every cell
197     * request.</p>
198     *
199     * <p>The client should put the resulting segments into its "query local"
200     * cache, to ensure that future cells in that segment can be answered
201     * without a call to the cache manager. (That is probably 1000x faster.)</p>
202     *
203     * <p>The cache manager does not inform where client where each segment
204     * came from. There are several possibilities:</p>
205     *
206     * <ul>
207     *     <li>Segment was already in cache (header and body)</li>
208     *     <li>Segment is in the process of being loaded by executing a SQL
209     *     statement (probably due to a request from another client)</li>
210     *     <li>Segment is in an external cache (that is, header is in the cache,
211     *        body is not yet)</li>
212     *     <li>Segment can be created by rolling up one or more cache segments.
213     *        (And of course each of these segments might be "paged out".)</li>
214     *     <li>By executing a SQL {@code GROUP BY} statement</li>
215     * </ul>
216     *
217     * <p>Furthermore, segments in external cache may take some time to retrieve
218     * (a LAN round trip, say 1 millisecond, is a reasonable guess); and the
219     * request may fail. (It depends on the cache, but caches are at liberty
220     * to 'forget' segments.) So, any strategy that relies on cache segments
221     * should be able to fall back. Even if there are fall backs, only one call
222     * needs to be made to the cache manager.</p>
223     *
224     * @return Whether any aggregations were loaded.
225     */
226    boolean loadAggregations() {
227        if (!isDirty()) {
228            return false;
229        }
230
231        // List of futures yielding segments populated by SQL statements. If
232        // loading requires several iterations, we just append to the list. We
233        // don't mind if it takes a while for SQL statements to return.
234        final List<Future<Map<Segment, SegmentWithData>>> sqlSegmentMapFutures =
235            new ArrayList<Future<Map<Segment, SegmentWithData>>>();
236
237        final List<CellRequest> cellRequests1 =
238            new ArrayList<CellRequest>(cellRequests);
239
240        for (int iteration = 0;; ++iteration) {
241            final BatchLoader.LoadBatchResponse response =
242                cacheMgr.execute(
243                    new BatchLoader.LoadBatchCommand(
244                        Locus.peek(),
245                        cacheMgr,
246                        getDialect(),
247                        cube,
248                        Collections.unmodifiableList(cellRequests1)));
249
250            int failureCount = 0;
251
252            // Segments that have been retrieved from cache this cycle. Allows
253            // us to reduce calls to the external cache.
254            Map<SegmentHeader, SegmentBody> headerBodies =
255                new HashMap<SegmentHeader, SegmentBody>();
256
257            // Load each suggested segment from cache, and place it in
258            // thread-local cache. Note that this step can't be done by the
259            // cacheMgr -- it's our cache.
260            for (SegmentHeader header : response.cacheSegments) {
261                final SegmentBody body = cacheMgr.compositeCache.get(header);
262                if (body == null) {
263                    // REVIEW: This is an async call. It will return before the
264                    // index is informed that this header is there,
265                    // so a LoadBatchCommand might still return
266                    // it on the next iteration.
267                    if (cube.getStar() != null) {
268                        cacheMgr.remove(cube.getStar(), header);
269                    }
270                    ++failureCount;
271                    continue;
272                }
273                headerBodies.put(header, body);
274                final SegmentWithData segmentWithData =
275                    response.convert(header, body);
276                segmentWithData.getStar().register(segmentWithData);
277            }
278
279            // Perform each suggested rollup.
280            //
281            // TODO this could be improved.
282            // See http://jira.pentaho.com/browse/MONDRIAN-1195
283
284            // Rollups that succeeded. Will tell cache mgr to put the headers
285            // into the index and the header/bodies in cache.
286            final Map<SegmentHeader, SegmentBody> succeededRollups =
287                new HashMap<SegmentHeader, SegmentBody>();
288
289            for (final BatchLoader.RollupInfo rollup : response.rollups) {
290                // Gather the required segments.
291                Map<SegmentHeader, SegmentBody> map =
292                    findResidentRollupCandidate(headerBodies, rollup);
293                if (map == null) {
294                    // None of the candidate segment-sets for this rollup was
295                    // all present in the cache.
296                    continue;
297                }
298
299                final Set<String> keepColumns = new HashSet<String>();
300                for (RolapStar.Column column : rollup.constrainedColumns) {
301                    keepColumns.add(
302                        column.getExpression().getGenericExpression());
303                }
304                Pair<SegmentHeader, SegmentBody> rollupHeaderBody =
305                    SegmentBuilder.rollup(
306                        map,
307                        keepColumns,
308                        rollup.constrainedColumnsBitKey,
309                        rollup.measure.getAggregator().getRollup(),
310                        rollup.measure.getDatatype());
311
312                final SegmentHeader header = rollupHeaderBody.left;
313                final SegmentBody body = rollupHeaderBody.right;
314
315                if (headerBodies.containsKey(header)) {
316                    // We had already created this segment, somehow.
317                    continue;
318                }
319
320                headerBodies.put(header, body);
321                succeededRollups.put(header, body);
322
323                final SegmentWithData segmentWithData =
324                    response.convert(header, body);
325
326                // Register this segment with the local star.
327                segmentWithData.getStar().register(segmentWithData);
328
329                // Make sure that the cache manager knows about this new
330                // segment. First thing we do is to add it to the index.
331                // Then we insert the segment body into the SlotFuture.
332                // This has to be done on the SegmentCacheManager's
333                // Actor thread to ensure thread safety.
334                if (!MondrianProperties.instance().DisableCaching.get()) {
335                    final Locus locus = Locus.peek();
336                    cacheMgr.execute(
337                        new SegmentCacheManager.Command<Void>() {
338                            public Void call() throws Exception {
339                                SegmentCacheIndex index =
340                                    cacheMgr.getIndexRegistry()
341                                    .getIndex(segmentWithData.getStar());
342                                boolean added = index.add(
343                                    segmentWithData.getHeader(),
344                                    true,
345                                    response.converterMap.get(
346                                        SegmentCacheIndexImpl
347                                            .makeConverterKey(
348                                                segmentWithData.getHeader())));
349                                if (added) {
350                                    index.loadSucceeded(
351                                        segmentWithData.getHeader(), body);
352                                }
353                                return null;
354                            }
355                            public Locus getLocus() {
356                                return locus;
357                            }
358                        });
359                }
360            }
361
362            // Wait for SQL statements to end -- but only if there are no
363            // failures.
364            //
365            // If there are failures, and its the first iteration, it's more
366            // urgent that we create and execute a follow-up request. We will
367            // wait for the pending SQL statements at the end of that.
368            //
369            // If there are failures on later iterations, wait for SQL
370            // statements to end. The cache might be porous. SQL might be the
371            // only way to make progress.
372            sqlSegmentMapFutures.addAll(response.sqlSegmentMapFutures);
373            if (failureCount == 0 || iteration > 0) {
374                // Wait on segments being loaded by someone else.
375                for (Map.Entry<SegmentHeader, Future<SegmentBody>> entry
376                    : response.futures.entrySet())
377                {
378                    final SegmentHeader header = entry.getKey();
379                    final Future<SegmentBody> bodyFuture = entry.getValue();
380                    final SegmentBody body = Util.safeGet(
381                        bodyFuture,
382                        "Waiting for someone else's segment to load via SQL");
383                    final SegmentWithData segmentWithData =
384                        response.convert(header, body);
385                    segmentWithData.getStar().register(segmentWithData);
386                }
387
388                // Wait on segments being loaded by SQL statements we asked for.
389                for (Future<Map<Segment, SegmentWithData>> sqlSegmentMapFuture
390                    : sqlSegmentMapFutures)
391                {
392                    final Map<Segment, SegmentWithData> segmentMap =
393                        Util.safeGet(
394                            sqlSegmentMapFuture,
395                            "Waiting for segment to load via SQL");
396                    for (SegmentWithData segmentWithData : segmentMap.values())
397                    {
398                        segmentWithData.getStar().register(segmentWithData);
399                    }
400                    // TODO: also pass back SegmentHeader and SegmentBody,
401                    // and add these to headerBodies. Might help?
402                }
403            }
404
405            if (failureCount == 0) {
406                break;
407            }
408
409            // Figure out which cell requests are not satisfied by any of the
410            // segments retrieved.
411            @SuppressWarnings("unchecked")
412            List<CellRequest> old = new ArrayList<CellRequest>(cellRequests1);
413            cellRequests1.clear();
414            for (CellRequest cellRequest : old) {
415                if (cellRequest.getMeasure().getStar()
416                    .getCellFromCache(cellRequest, null) == null)
417                {
418                    cellRequests1.add(cellRequest);
419                }
420            }
421
422            if (cellRequests1.isEmpty()) {
423                break;
424            }
425
426            if (cellRequests1.size() >= old.size()
427                && iteration > 10)
428            {
429                throw Util.newError(
430                    "Cache round-trip did not resolve any cell requests. "
431                    + "Iteration #" + iteration
432                    + "; request count " + cellRequests1.size()
433                    + "; requested headers: " + response.cacheSegments.size()
434                    + "; requested rollups: " + response.rollups.size()
435                    + "; requested SQL: "
436                    + response.sqlSegmentMapFutures.size());
437            }
438
439            // Continue loop; form and execute a new request with the smaller
440            // set of cell requests.
441        }
442
443        dirty = false;
444        cellRequests.clear();
445        return true;
446    }
447
448    /**
449     * Finds a segment-list among a list of candidate segment-lists
450     * for which the bodies of all segments are in cache. Returns a map
451     * from segment-to-body if found, or null if not found.
452     *
453     * @param headerBodies Cache of bodies previously retrieved from external
454     *                     cache
455     *
456     * @param rollup       Specifies what segments to roll up, and the
457     *                     target dimensionality
458     *
459     * @return Collection of segment headers and bodies suitable for rollup,
460     * or null
461     */
462    private Map<SegmentHeader, SegmentBody> findResidentRollupCandidate(
463        Map<SegmentHeader, SegmentBody> headerBodies,
464        BatchLoader.RollupInfo rollup)
465    {
466        candidateLoop:
467        for (List<SegmentHeader> headers : rollup.candidateLists) {
468            final Map<SegmentHeader, SegmentBody> map =
469                new HashMap<SegmentHeader, SegmentBody>();
470            for (SegmentHeader header : headers) {
471                SegmentBody body = loadSegmentFromCache(headerBodies, header);
472                if (body == null) {
473                    // To proceed with a candidate, require all headers to
474                    // be in cache.
475                    continue candidateLoop;
476                }
477                map.put(header, body);
478            }
479            return map;
480        }
481        return null;
482    }
483
484    private SegmentBody loadSegmentFromCache(
485        Map<SegmentHeader, SegmentBody> headerBodies,
486        SegmentHeader header)
487    {
488        SegmentBody body = headerBodies.get(header);
489        if (body != null) {
490            return body;
491        }
492        body = cacheMgr.compositeCache.get(header);
493        if (body == null) {
494            if (cube.getStar() != null) {
495                cacheMgr.remove(cube.getStar(), header);
496            }
497            return null;
498        }
499        headerBodies.put(header, body);
500        return body;
501    }
502
503    /**
504     * Returns the SQL dialect. Overridden in some unit tests.
505     *
506     * @return Dialect
507     */
508    Dialect getDialect() {
509        final RolapStar star = cube.getStar();
510        if (star != null) {
511            return star.getSqlQueryDialect();
512        } else {
513            return cube.getSchema().getDialect();
514        }
515    }
516
517    /**
518     * Sets the flag indicating that the reader has told a lie.
519     */
520    void setDirty(boolean dirty) {
521        this.dirty = dirty;
522    }
523
524}
525
526/**
527 * Context for processing a request to the cache manager for segments matching a
528 * collection of cell requests. All methods except the constructor are executed
529 * by the cache manager's dedicated thread.
530 */
531class BatchLoader {
532    private static final Logger LOGGER =
533        Logger.getLogger(FastBatchingCellReader.class);
534
535    private final Locus locus;
536    private final SegmentCacheManager cacheMgr;
537    private final Dialect dialect;
538    private final RolapCube cube;
539
540    private final Map<AggregationKey, Batch> batches =
541        new HashMap<AggregationKey, Batch>();
542
543    private final Set<SegmentHeader> cacheHeaders =
544        new LinkedHashSet<SegmentHeader>();
545
546    private final Map<SegmentHeader, Future<SegmentBody>> futures =
547        new HashMap<SegmentHeader, Future<SegmentBody>>();
548
549    private final List<RollupInfo> rollups = new ArrayList<RollupInfo>();
550
551    private final Set<BitKey> rollupBitmaps = new HashSet<BitKey>();
552
553    private final Map<List, SegmentBuilder.SegmentConverter> converterMap =
554        new HashMap<List, SegmentBuilder.SegmentConverter>();
555
556    public BatchLoader(
557        Locus locus,
558        SegmentCacheManager cacheMgr,
559        Dialect dialect,
560        RolapCube cube)
561    {
562        this.locus = locus;
563        this.cacheMgr = cacheMgr;
564        this.dialect = dialect;
565        this.cube = cube;
566    }
567
568    final boolean shouldUseGroupingFunction() {
569        return MondrianProperties.instance().EnableGroupingSets.get()
570            && dialect.supportsGroupingSets();
571    }
572
573    private void recordCellRequest2(final CellRequest request) {
574        // If there is a segment matching these criteria, write it to the list
575        // of found segments, and remove the cell request from the list.
576        final AggregationKey key = new AggregationKey(request);
577
578        final SegmentBuilder.SegmentConverterImpl converter =
579                new SegmentBuilder.SegmentConverterImpl(key, request);
580
581        boolean success =
582            loadFromCaches(request, key, converter);
583        // Skip the batch if we already have a rollup for it.
584        if (rollupBitmaps.contains(request.getConstrainedColumnsBitKey())) {
585            return;
586        }
587
588        // As a last resort, we load from SQL.
589        if (!success) {
590            loadFromSql(request, key, converter);
591        }
592    }
593
594    /**
595     * Loads a cell from caches. If the cell is successfully loaded,
596     * we return true.
597     */
598    private boolean loadFromCaches(
599        final CellRequest request,
600        final AggregationKey key,
601        final SegmentBuilder.SegmentConverterImpl converter)
602    {
603        if (MondrianProperties.instance().DisableCaching.get()) {
604            // Caching is disabled. Return always false.
605            return false;
606        }
607
608        // Is request matched by one of the headers we intend to load?
609        final Map<String, Comparable> mappedCellValues =
610            request.getMappedCellValues();
611        final List<String> compoundPredicates =
612            AggregationKey.getCompoundPredicateStringList(
613                key.getStar(),
614                key.getCompoundPredicateList());
615
616        for (SegmentHeader header : cacheHeaders) {
617            if (SegmentCacheIndexImpl.matches(
618                    header,
619                    mappedCellValues,
620                    compoundPredicates))
621            {
622                // It's likely that the header will be in the cache, so this
623                // request will be satisfied. If not, the header will be removed
624                // from the segment index, and we'll be back.
625                return true;
626            }
627        }
628        final RolapStar.Measure measure = request.getMeasure();
629        final RolapStar star = measure.getStar();
630        final RolapSchema schema = star.getSchema();
631        final SegmentCacheIndex index =
632            cacheMgr.getIndexRegistry().getIndex(star);
633        final List<SegmentHeader> headersInCache =
634            index.locate(
635                schema.getName(),
636                schema.getChecksum(),
637                measure.getCubeName(),
638                measure.getName(),
639                star.getFactTable().getAlias(),
640                request.getConstrainedColumnsBitKey(),
641                mappedCellValues,
642                compoundPredicates);
643
644        // Ask for the first segment to be loaded from cache. (If it's no longer
645        // in cache, we'll be back, and presumably we'll try the second
646        // segment.)
647
648        if (!headersInCache.isEmpty()) {
649            final SegmentHeader headerInCache = headersInCache.get(0);
650
651            final Future<SegmentBody> future =
652                index.getFuture(locus.execution, headerInCache);
653
654            if (future != null) {
655                // Segment header is in cache, body is being loaded. Worker will
656                // need to wait for load to complete.
657                futures.put(headerInCache, future);
658            } else {
659                // Segment is in cache.
660                cacheHeaders.add(headerInCache);
661            }
662            index.setConverter(
663                headerInCache.schemaName,
664                headerInCache.schemaChecksum,
665                headerInCache.cubeName,
666                headerInCache.rolapStarFactTableName,
667                headerInCache.measureName,
668                headerInCache.compoundPredicates,
669                converter);
670            converterMap.put(
671                SegmentCacheIndexImpl.makeConverterKey(request, key),
672                converter);
673            return true;
674        }
675
676        // Try to roll up if the measure's rollup aggregator supports
677        // "fast" aggregation from raw objects.
678        //
679        // Do not try to roll up if this request has already chosen a rollup
680        // with the same target dimensionality. It is quite likely that the
681        // other rollup will satisfy this request, and it's complicated to be
682        // 100% sure. If we're wrong, we'll be back.
683
684        // Also make sure that we don't try to rollup a measure which
685        // doesn't support rollup from raw data, like a distinct count
686        // for example. Both the measure's aggregator and its rollup
687        // aggregator must support raw data aggregation. We call
688        // Aggregator.supportsFastAggregates() to verify.
689        if (MondrianProperties.instance()
690                .EnableInMemoryRollup.get()
691            && measure.getAggregator().supportsFastAggregates(
692                measure.getDatatype())
693            && measure.getAggregator().getRollup().supportsFastAggregates(
694                measure.getDatatype())
695            && !isRequestCoveredByRollups(request))
696        {
697            // Don't even bother doing a segment lookup if we can't
698            // rollup that measure.
699            final List<List<SegmentHeader>> rollup =
700                index.findRollupCandidates(
701                    schema.getName(),
702                    schema.getChecksum(),
703                    measure.getCubeName(),
704                    measure.getName(),
705                    star.getFactTable().getAlias(),
706                    request.getConstrainedColumnsBitKey(),
707                    mappedCellValues,
708                    AggregationKey.getCompoundPredicateStringList(
709                        star,
710                        key.getCompoundPredicateList()));
711            if (!rollup.isEmpty()) {
712                rollups.add(
713                    new RollupInfo(
714                        request,
715                        rollup));
716                rollupBitmaps.add(request.getConstrainedColumnsBitKey());
717                converterMap.put(
718                    SegmentCacheIndexImpl.makeConverterKey(request, key),
719                    new SegmentBuilder.StarSegmentConverter(
720                        measure,
721                        key.getCompoundPredicateList()));
722                return true;
723            }
724        }
725        return false;
726    }
727
728      /**
729       * Checks if the request can be satisfied by a rollup already in place
730       * and moves that rollup to the top of the list if not there.
731       */
732      private boolean isRequestCoveredByRollups(CellRequest request) {
733          BitKey bitKey = request.getConstrainedColumnsBitKey();
734          if (!rollupBitmaps.contains(bitKey)) {
735              return false;
736          }
737          List<SegmentHeader> firstOkList = null;
738          for (RollupInfo rollupInfo : rollups) {
739              if (!rollupInfo.constrainedColumnsBitKey.equals(bitKey)) {
740                  continue;
741              }
742              int candidateListsIdx = 0;
743              // bitkey is the same, are the constrained values compatible?
744              candidatesLoop:
745                  for (List<SegmentHeader> candList
746                      : rollupInfo.candidateLists)
747                  {
748                      for (SegmentHeader header : candList) {
749                          if (headerCoversRequest(header, request)) {
750                              firstOkList = candList;
751                              break candidatesLoop;
752                          }
753                      }
754                      candidateListsIdx++;
755                  }
756              if (firstOkList != null) {
757                  if (candidateListsIdx > 0) {
758                      // move good candidate list to first position
759                      rollupInfo.candidateLists.remove(candidateListsIdx);
760                      rollupInfo.candidateLists.set(0, firstOkList);
761                  }
762                  return true;
763              }
764          }
765          return false;
766      }
767
768      /**
769       * Check constraint compatibility
770       */
771      private boolean headerCoversRequest(
772          SegmentHeader header,
773          CellRequest request)
774      {
775          BitKey bitKey = request.getConstrainedColumnsBitKey();
776          assert header.getConstrainedColumnsBitKey().cardinality()
777                >= bitKey.cardinality();
778          BitKey headerBitKey = header.getConstrainedColumnsBitKey();
779          // get all constrained values for relevant bitKey positions
780          List<SortedSet<Comparable>> headerValues =
781              new ArrayList<SortedSet<Comparable>>(bitKey.cardinality());
782          Map<Integer, Integer> valueIndexes = new HashMap<Integer, Integer>();
783          int relevantCCIdx = 0, keyValuesIdx = 0;
784          for (int bitPos : headerBitKey) {
785              if (bitKey.get(bitPos)) {
786                  headerValues.add(
787                      header.getConstrainedColumns().get(relevantCCIdx).values);
788                  valueIndexes.put(bitPos, keyValuesIdx++);
789              }
790              relevantCCIdx++;
791          }
792          assert request.getConstrainedColumns().length
793              == request.getSingleValues().length;
794          // match header constraints against request values
795          for (int i = 0; i < request.getConstrainedColumns().length; i++) {
796              RolapStar.Column col = request.getConstrainedColumns()[i];
797              Integer valueIdx = valueIndexes.get(col.getBitPosition());
798              if (headerValues.get(valueIdx) != null
799                  && !headerValues.get(valueIdx).contains(
800                      request.getSingleValues()[i]))
801              {
802                return false;
803              }
804          }
805          return true;
806      }
807
808    private void loadFromSql(
809        final CellRequest request,
810        final AggregationKey key,
811        final SegmentBuilder.SegmentConverterImpl converter)
812    {
813        // Finally, add to a batch. It will turn in to a SQL request.
814        Batch batch = batches.get(key);
815        if (batch == null) {
816            batch = new Batch(request);
817            batches.put(key, batch);
818            converterMap.put(
819                SegmentCacheIndexImpl.makeConverterKey(request, key),
820                converter);
821
822            if (LOGGER.isDebugEnabled()) {
823                StringBuilder buf = new StringBuilder(100);
824                buf.append("FastBatchingCellReader: bitkey=");
825                buf.append(request.getConstrainedColumnsBitKey());
826                buf.append(Util.nl);
827
828                for (RolapStar.Column column
829                    : request.getConstrainedColumns())
830                {
831                    buf.append("  ");
832                    buf.append(column);
833                    buf.append(Util.nl);
834                }
835                LOGGER.debug(buf.toString());
836            }
837        }
838        batch.add(request);
839    }
840
841    /**
842     * Determines which segments need to be loaded from external cache,
843     * created using roll up, or created using SQL to satisfy a given list
844     * of cell requests.
845     *
846     * @return List of segment futures. Each segment future may or may not be
847     *    already present (it depends on the current location of the segment
848     *    body). Each future will return a not-null segment (or throw).
849     */
850    LoadBatchResponse load(List<CellRequest> cellRequests) {
851        // Check for cancel/timeout. The request might have been on the queue
852        // for a while.
853        if (locus.execution != null) {
854            locus.execution.checkCancelOrTimeout();
855        }
856
857        final long t1 = System.currentTimeMillis();
858
859        // Now we're inside the cache manager, we can see which of our cell
860        // requests can be answered from cache. Those that can will be added
861        // to the segments list; those that can not will be converted into
862        // batches and rolled up or loaded using SQL.
863        for (CellRequest cellRequest : cellRequests) {
864            recordCellRequest2(cellRequest);
865        }
866
867        // Sort the batches into deterministic order.
868        List<Batch> batchList =
869            new ArrayList<Batch>(batches.values());
870        Collections.sort(batchList, BatchComparator.instance);
871        final List<Future<Map<Segment, SegmentWithData>>> segmentMapFutures =
872            new ArrayList<Future<Map<Segment, SegmentWithData>>>();
873        if (shouldUseGroupingFunction()) {
874            LOGGER.debug("Using grouping sets");
875            List<CompositeBatch> groupedBatches = groupBatches(batchList);
876            for (CompositeBatch batch : groupedBatches) {
877                batch.load(segmentMapFutures);
878            }
879        } else {
880            // Load batches in turn.
881            for (Batch batch : batchList) {
882                batch.loadAggregation(segmentMapFutures);
883            }
884        }
885
886        if (LOGGER.isDebugEnabled()) {
887            final long t2 = System.currentTimeMillis();
888            LOGGER.debug("load (millis): " + (t2 - t1));
889        }
890
891        // Create a response and return it to the client. The response is a
892        // bunch of work to be done (waiting for segments to load from SQL, to
893        // come from cache, and so forth) on the client's time. Some of the bets
894        // may not come off, in which case, the client will send us another
895        // request.
896        return new LoadBatchResponse(
897            cellRequests,
898            new ArrayList<SegmentHeader>(cacheHeaders),
899            rollups,
900            converterMap,
901            segmentMapFutures,
902            futures);
903    }
904
905    static List<CompositeBatch> groupBatches(List<Batch> batchList) {
906        Map<AggregationKey, CompositeBatch> batchGroups =
907            new HashMap<AggregationKey, CompositeBatch>();
908        for (int i = 0; i < batchList.size(); i++) {
909            for (int j = i + 1; j < batchList.size();) {
910                final Batch iBatch = batchList.get(i);
911                final Batch jBatch = batchList.get(j);
912                if (iBatch.canBatch(jBatch)) {
913                    batchList.remove(j);
914                    addToCompositeBatch(batchGroups, iBatch, jBatch);
915                } else if (jBatch.canBatch(iBatch)) {
916                    batchList.set(i, jBatch);
917                    batchList.remove(j);
918                    addToCompositeBatch(batchGroups, jBatch, iBatch);
919                    j = i + 1;
920                } else {
921                    j++;
922                }
923            }
924        }
925
926        wrapNonBatchedBatchesWithCompositeBatches(batchList, batchGroups);
927        final CompositeBatch[] compositeBatches =
928            batchGroups.values().toArray(
929                new CompositeBatch[batchGroups.size()]);
930        Arrays.sort(compositeBatches, CompositeBatchComparator.instance);
931        return Arrays.asList(compositeBatches);
932    }
933
934    private static void wrapNonBatchedBatchesWithCompositeBatches(
935        List<Batch> batchList,
936        Map<AggregationKey, CompositeBatch> batchGroups)
937    {
938        for (Batch batch : batchList) {
939            if (batchGroups.get(batch.batchKey) == null) {
940                batchGroups.put(batch.batchKey, new CompositeBatch(batch));
941            }
942        }
943    }
944
945    static void addToCompositeBatch(
946        Map<AggregationKey, CompositeBatch> batchGroups,
947        Batch detailedBatch,
948        Batch summaryBatch)
949    {
950        CompositeBatch compositeBatch = batchGroups.get(detailedBatch.batchKey);
951
952        if (compositeBatch == null) {
953            compositeBatch = new CompositeBatch(detailedBatch);
954            batchGroups.put(detailedBatch.batchKey, compositeBatch);
955        }
956
957        CompositeBatch compositeBatchOfSummaryBatch =
958            batchGroups.remove(summaryBatch.batchKey);
959
960        if (compositeBatchOfSummaryBatch != null) {
961            compositeBatch.merge(compositeBatchOfSummaryBatch);
962        } else {
963            compositeBatch.add(summaryBatch);
964        }
965    }
966
967    /**
968     * Command that loads the segments required for a collection of cell
969     * requests. Returns the collection of segments.
970     */
971    public static class LoadBatchCommand
972        implements SegmentCacheManager.Command<LoadBatchResponse>
973    {
974        private final Locus locus;
975        private final SegmentCacheManager cacheMgr;
976        private final Dialect dialect;
977        private final RolapCube cube;
978        private final List<CellRequest> cellRequests;
979        private final Map<String, Object> mdc =
980            new HashMap<String, Object>();
981
982        public LoadBatchCommand(
983            Locus locus,
984            SegmentCacheManager cacheMgr,
985            Dialect dialect,
986            RolapCube cube,
987            List<CellRequest> cellRequests)
988        {
989            this.locus = locus;
990            this.cacheMgr = cacheMgr;
991            this.dialect = dialect;
992            this.cube = cube;
993            this.cellRequests = cellRequests;
994            if (MDC.getContext() != null) {
995                this.mdc.putAll(MDC.getContext());
996            }
997        }
998
999        public LoadBatchResponse call() {
1000            if (MDC.getContext() != null) {
1001                final Map<String, Object> old = MDC.getContext();
1002                old.clear();
1003                old.putAll(mdc);
1004            }
1005            return new BatchLoader(locus, cacheMgr, dialect, cube)
1006                .load(cellRequests);
1007        }
1008
1009        public Locus getLocus() {
1010            return locus;
1011        }
1012    }
1013
1014    /**
1015     * Set of Batches which can grouped together.
1016     */
1017    static class CompositeBatch {
1018        /** Batch with most number of constraint columns */
1019        final Batch detailedBatch;
1020
1021        /** Batches whose data can be fetched using rollup on detailed batch */
1022        final List<Batch> summaryBatches = new ArrayList<Batch>();
1023
1024        CompositeBatch(Batch detailedBatch) {
1025            this.detailedBatch = detailedBatch;
1026        }
1027
1028        void add(Batch summaryBatch) {
1029            summaryBatches.add(summaryBatch);
1030        }
1031
1032        void merge(CompositeBatch summaryBatch) {
1033            summaryBatches.add(summaryBatch.detailedBatch);
1034            summaryBatches.addAll(summaryBatch.summaryBatches);
1035        }
1036
1037        public void load(
1038            List<Future<Map<Segment, SegmentWithData>>> segmentFutures)
1039        {
1040            GroupingSetsCollector batchCollector =
1041                new GroupingSetsCollector(true);
1042            this.detailedBatch.loadAggregation(batchCollector, segmentFutures);
1043
1044            int cellRequestCount = 0;
1045            for (Batch batch : summaryBatches) {
1046                batch.loadAggregation(batchCollector, segmentFutures);
1047                cellRequestCount += batch.cellRequestCount;
1048            }
1049
1050            getSegmentLoader().load(
1051                cellRequestCount,
1052                batchCollector.getGroupingSets(),
1053                detailedBatch.batchKey.getCompoundPredicateList(),
1054                segmentFutures);
1055        }
1056
1057        SegmentLoader getSegmentLoader() {
1058            return new SegmentLoader(detailedBatch.getCacheMgr());
1059        }
1060    }
1061
1062    private static final Logger BATCH_LOGGER = Logger.getLogger(Batch.class);
1063
1064    public static class RollupInfo {
1065        final RolapStar.Column[] constrainedColumns;
1066        final BitKey constrainedColumnsBitKey;
1067        final RolapStar.Measure measure;
1068        final List<List<SegmentHeader>> candidateLists;
1069
1070        RollupInfo(
1071            CellRequest request,
1072            List<List<SegmentHeader>> candidateLists)
1073        {
1074            this.candidateLists = candidateLists;
1075            constrainedColumns = request.getConstrainedColumns();
1076            constrainedColumnsBitKey = request.getConstrainedColumnsBitKey();
1077            measure = request.getMeasure();
1078        }
1079    }
1080
1081    /**
1082     * Request sent from cache manager to a worker to load segments into
1083     * the cache, create segments by rolling up, and to wait for segments
1084     * being loaded via SQL.
1085     */
1086    static class LoadBatchResponse {
1087        /**
1088         * List of segments that are being loaded using SQL.
1089         *
1090         * <p>Other workers are executing the SQL. When done, they will write a
1091         * segment body or an error into the respective futures. The thread
1092         * processing this request will wait on those futures, once all segments
1093         * have successfully arrived from cache.</p>
1094         */
1095        final List<Future<Map<Segment, SegmentWithData>>> sqlSegmentMapFutures;
1096
1097        /**
1098         * List of segments we are trying to load from the cache.
1099         */
1100        final List<SegmentHeader> cacheSegments;
1101
1102        /**
1103         * List of cell requests that will be satisfied by segments we are
1104         * trying to load from the cache (or create by rolling up).
1105         */
1106        final List<CellRequest> cellRequests;
1107
1108        /**
1109         * List of segments to be created from segments in the cache, provided
1110         * that the cache segments come through.
1111         *
1112         * <p>If they do not, we will need to tell the cache manager to remove
1113         * the pending segments.</p>
1114         */
1115        final List<RollupInfo> rollups;
1116
1117        final Map<List, SegmentBuilder.SegmentConverter> converterMap;
1118
1119        final Map<SegmentHeader, Future<SegmentBody>> futures;
1120
1121        LoadBatchResponse(
1122            List<CellRequest> cellRequests,
1123            List<SegmentHeader> cacheSegments,
1124            List<RollupInfo> rollups,
1125            Map<List, SegmentBuilder.SegmentConverter> converterMap,
1126            List<Future<Map<Segment, SegmentWithData>>> sqlSegmentMapFutures,
1127            Map<SegmentHeader, Future<SegmentBody>> futures)
1128        {
1129            this.cellRequests = cellRequests;
1130            this.sqlSegmentMapFutures = sqlSegmentMapFutures;
1131            this.cacheSegments = cacheSegments;
1132            this.rollups = rollups;
1133            this.converterMap = converterMap;
1134            this.futures = futures;
1135        }
1136
1137        public SegmentWithData convert(
1138            SegmentHeader header,
1139            SegmentBody body)
1140        {
1141            final SegmentBuilder.SegmentConverter converter =
1142                converterMap.get(
1143                    SegmentCacheIndexImpl.makeConverterKey(header));
1144            return converter.convert(header, body);
1145        }
1146    }
1147
1148    public class Batch {
1149        // the CellRequest's constrained columns
1150        final RolapStar.Column[] columns;
1151        final List<RolapStar.Measure> measuresList =
1152            new ArrayList<RolapStar.Measure>();
1153        final Set<StarColumnPredicate>[] valueSets;
1154        final AggregationKey batchKey;
1155        // string representation; for debug; set lazily in toString
1156        private String string;
1157        private int cellRequestCount;
1158        private List<StarColumnPredicate[]> tuples =
1159            new ArrayList<StarColumnPredicate[]>();
1160
1161        public Batch(CellRequest request) {
1162            columns = request.getConstrainedColumns();
1163            valueSets = new HashSet[columns.length];
1164            for (int i = 0; i < valueSets.length; i++) {
1165                valueSets[i] = new HashSet<StarColumnPredicate>();
1166            }
1167            batchKey = new AggregationKey(request);
1168        }
1169
1170        public String toString() {
1171            if (string == null) {
1172                final StringBuilder buf = new StringBuilder();
1173                buf.append("Batch {\n")
1174                    .append("  columns={").append(Arrays.toString(columns))
1175                    .append("}\n")
1176                    .append("  measures={").append(measuresList).append("}\n")
1177                    .append("  valueSets={").append(Arrays.toString(valueSets))
1178                    .append("}\n")
1179                    .append("  batchKey=").append(batchKey).append("}\n")
1180                    .append("}");
1181                string = buf.toString();
1182            }
1183            return string;
1184        }
1185
1186        public final void add(CellRequest request) {
1187            ++cellRequestCount;
1188            final int valueCount = request.getNumValues();
1189            final StarColumnPredicate[] tuple =
1190                new StarColumnPredicate[valueCount];
1191            for (int j = 0; j < valueCount; j++) {
1192                final StarColumnPredicate value = request.getValueAt(j);
1193                valueSets[j].add(value);
1194                tuple[j] = value;
1195            }
1196            tuples.add(tuple);
1197            final RolapStar.Measure measure = request.getMeasure();
1198            if (!measuresList.contains(measure)) {
1199                assert (measuresList.size() == 0)
1200                       || (measure.getStar()
1201                           == (measuresList.get(0)).getStar())
1202                    : "Measure must belong to same star as other measures";
1203                measuresList.add(measure);
1204            }
1205        }
1206
1207        /**
1208         * Returns the RolapStar associated with the Batch's first Measure.
1209         *
1210         * <p>This method can only be called after the {@link #add} method has
1211         * been called.
1212         *
1213         * @return the RolapStar associated with the Batch's first Measure
1214         */
1215        private RolapStar getStar() {
1216            RolapStar.Measure measure = measuresList.get(0);
1217            return measure.getStar();
1218        }
1219
1220        public BitKey getConstrainedColumnsBitKey() {
1221            return batchKey.getConstrainedColumnsBitKey();
1222        }
1223
1224        public SegmentCacheManager getCacheMgr() {
1225            return cacheMgr;
1226        }
1227
1228        public final void loadAggregation(
1229            List<Future<Map<Segment, SegmentWithData>>> segmentFutures)
1230        {
1231            GroupingSetsCollector collectorWithGroupingSetsTurnedOff =
1232                new GroupingSetsCollector(false);
1233            loadAggregation(collectorWithGroupingSetsTurnedOff, segmentFutures);
1234        }
1235
1236        final void loadAggregation(
1237            GroupingSetsCollector groupingSetsCollector,
1238            List<Future<Map<Segment, SegmentWithData>>> segmentFutures)
1239        {
1240            if (MondrianProperties.instance().GenerateAggregateSql.get()) {
1241                generateAggregateSql();
1242            }
1243            final StarColumnPredicate[] predicates = initPredicates();
1244            final long t1 = System.currentTimeMillis();
1245
1246            // TODO: optimize key sets; drop a constraint if more than x% of
1247            // the members are requested; whether we should get just the cells
1248            // requested or expand to a n-cube
1249
1250            // If the database cannot execute "count(distinct ...)", split the
1251            // distinct aggregations out.
1252            int distinctMeasureCount = getDistinctMeasureCount(measuresList);
1253            boolean tooManyDistinctMeasures =
1254                distinctMeasureCount > 0
1255                && !dialect.allowsCountDistinct()
1256                || distinctMeasureCount > 1
1257                   && !dialect.allowsMultipleCountDistinct();
1258
1259            if (tooManyDistinctMeasures) {
1260                doSpecialHandlingOfDistinctCountMeasures(
1261                    predicates,
1262                    groupingSetsCollector,
1263                    segmentFutures);
1264            }
1265
1266            // Load agg(distinct <SQL expression>) measures individually
1267            // for DBs that does allow multiple distinct SQL measures.
1268            if (!dialect.allowsMultipleDistinctSqlMeasures()) {
1269                // Note that the intention was originally to capture the
1270                // subquery SQL measures and separate them out; However,
1271                // without parsing the SQL string, Mondrian cannot distinguish
1272                // between "col1" + "col2" and subquery. Here the measure list
1273                // contains both types.
1274
1275                // See the test case testLoadDistinctSqlMeasure() in
1276                //  mondrian.rolap.FastBatchingCellReaderTest
1277
1278                List<RolapStar.Measure> distinctSqlMeasureList =
1279                    getDistinctSqlMeasures(measuresList);
1280                for (RolapStar.Measure measure : distinctSqlMeasureList) {
1281                    AggregationManager.loadAggregation(
1282                        cacheMgr,
1283                        cellRequestCount,
1284                        Collections.singletonList(measure),
1285                        columns,
1286                        batchKey,
1287                        predicates,
1288                        groupingSetsCollector,
1289                        segmentFutures);
1290                    measuresList.remove(measure);
1291                }
1292            }
1293
1294            final int measureCount = measuresList.size();
1295            if (measureCount > 0) {
1296                AggregationManager.loadAggregation(
1297                    cacheMgr,
1298                    cellRequestCount,
1299                    measuresList,
1300                    columns,
1301                    batchKey,
1302                    predicates,
1303                    groupingSetsCollector,
1304                    segmentFutures);
1305            }
1306
1307            if (BATCH_LOGGER.isDebugEnabled()) {
1308                final long t2 = System.currentTimeMillis();
1309                BATCH_LOGGER.debug(
1310                    "Batch.load (millis) " + (t2 - t1));
1311            }
1312        }
1313
1314        private void doSpecialHandlingOfDistinctCountMeasures(
1315            StarColumnPredicate[] predicates,
1316            GroupingSetsCollector groupingSetsCollector,
1317            List<Future<Map<Segment, SegmentWithData>>> segmentFutures)
1318        {
1319            while (true) {
1320                // Scan for a measure based upon a distinct aggregation.
1321                final RolapStar.Measure distinctMeasure =
1322                    getFirstDistinctMeasure(measuresList);
1323                if (distinctMeasure == null) {
1324                    break;
1325                }
1326                final String expr =
1327                    distinctMeasure.getExpression().getGenericExpression();
1328                final List<RolapStar.Measure> distinctMeasuresList =
1329                    new ArrayList<RolapStar.Measure>();
1330                for (int i = 0; i < measuresList.size();) {
1331                    final RolapStar.Measure measure = measuresList.get(i);
1332                    if (measure.getAggregator().isDistinct()
1333                        && measure.getExpression().getGenericExpression()
1334                        .equals(expr))
1335                    {
1336                        measuresList.remove(i);
1337                        distinctMeasuresList.add(distinctMeasure);
1338                    } else {
1339                        i++;
1340                    }
1341                }
1342
1343                // Load all the distinct measures based on the same expression
1344                // together
1345                AggregationManager.loadAggregation(
1346                    cacheMgr,
1347                    cellRequestCount,
1348                    distinctMeasuresList,
1349                    columns,
1350                    batchKey,
1351                    predicates,
1352                    groupingSetsCollector,
1353                    segmentFutures);
1354            }
1355        }
1356
1357        private StarColumnPredicate[] initPredicates() {
1358            StarColumnPredicate[] predicates =
1359                new StarColumnPredicate[columns.length];
1360            for (int j = 0; j < columns.length; j++) {
1361                Set<StarColumnPredicate> valueSet = valueSets[j];
1362
1363                StarColumnPredicate predicate;
1364                if (valueSet == null) {
1365                    predicate = LiteralStarPredicate.FALSE;
1366                } else {
1367                    ValueColumnPredicate[] values =
1368                        valueSet.toArray(
1369                            new ValueColumnPredicate[valueSet.size()]);
1370                    // Sort array to achieve determinism in generated SQL.
1371                    Arrays.sort(
1372                        values,
1373                        ValueColumnConstraintComparator.instance);
1374
1375                    predicate =
1376                        new ListColumnPredicate(
1377                            columns[j],
1378                            Arrays.asList((StarColumnPredicate[]) values));
1379                }
1380
1381                predicates[j] = predicate;
1382            }
1383            return predicates;
1384        }
1385
1386        private void generateAggregateSql() {
1387            if (cube == null || cube.isVirtual()) {
1388                final StringBuilder buf = new StringBuilder(64);
1389                buf.append(
1390                    "AggGen: Sorry, can not create SQL for virtual Cube \"")
1391                    .append(cube == null ? null : cube.getName())
1392                    .append("\", operation not currently supported");
1393                BATCH_LOGGER.error(buf.toString());
1394
1395            } else {
1396                final AggGen aggGen =
1397                    new AggGen(cube.getName(), cube.getStar(), columns);
1398                if (aggGen.isReady()) {
1399                    // PRINT TO STDOUT - DO NOT USE BATCH_LOGGER
1400                    System.out.println(
1401                        "createLost:" + Util.nl + aggGen.createLost());
1402                    System.out.println(
1403                        "insertIntoLost:" + Util.nl + aggGen.insertIntoLost());
1404                    System.out.println(
1405                        "createCollapsed:" + Util.nl
1406                        + aggGen.createCollapsed());
1407                    System.out.println(
1408                        "insertIntoCollapsed:" + Util.nl
1409                        + aggGen.insertIntoCollapsed());
1410                } else {
1411                    BATCH_LOGGER.error("AggGen failed");
1412                }
1413            }
1414        }
1415
1416        /**
1417         * Returns the first measure based upon a distinct aggregation, or null
1418         * if there is none.
1419         */
1420        final RolapStar.Measure getFirstDistinctMeasure(
1421            List<RolapStar.Measure> measuresList)
1422        {
1423            for (RolapStar.Measure measure : measuresList) {
1424                if (measure.getAggregator().isDistinct()) {
1425                    return measure;
1426                }
1427            }
1428            return null;
1429        }
1430
1431        /**
1432         * Returns the number of the measures based upon a distinct
1433         * aggregation.
1434         */
1435        private int getDistinctMeasureCount(
1436            List<RolapStar.Measure> measuresList)
1437        {
1438            int count = 0;
1439            for (RolapStar.Measure measure : measuresList) {
1440                if (measure.getAggregator().isDistinct()) {
1441                    ++count;
1442                }
1443            }
1444            return count;
1445        }
1446
1447        /**
1448         * Returns the list of measures based upon a distinct aggregation
1449         * containing SQL measure expressions(as opposed to column expressions).
1450         *
1451         * This method was initially intended for only those measures that are
1452         * defined using subqueries(for DBs that support them). However, since
1453         * Mondrian does not parse the SQL string, the method will count both
1454         * queries as well as some non query SQL expressions.
1455         */
1456        private List<RolapStar.Measure> getDistinctSqlMeasures(
1457            List<RolapStar.Measure> measuresList)
1458        {
1459            List<RolapStar.Measure> distinctSqlMeasureList =
1460                new ArrayList<RolapStar.Measure>();
1461            for (RolapStar.Measure measure : measuresList) {
1462                if (measure.getAggregator().isDistinct()
1463                    && measure.getExpression() instanceof
1464                        MondrianDef.MeasureExpression)
1465                {
1466                    MondrianDef.MeasureExpression measureExpr =
1467                        (MondrianDef.MeasureExpression) measure.getExpression();
1468                    MondrianDef.SQL measureSql = measureExpr.expressions[0];
1469                    // Checks if the SQL contains "SELECT" to detect the case a
1470                    // subquery is used to define the measure. This is not a
1471                    // perfect check, because a SQL expression on column names
1472                    // containing "SELECT" will also be detected. e,g,
1473                    // count("select beef" + "regular beef").
1474                    if (measureSql.cdata.toUpperCase().contains("SELECT")) {
1475                        distinctSqlMeasureList.add(measure);
1476                    }
1477                }
1478            }
1479            return distinctSqlMeasureList;
1480        }
1481
1482        /**
1483         * Returns whether another Batch can be batched to this Batch.
1484         *
1485         * <p>This is possible if:
1486         * <li>columns list is super set of other batch's constraint columns;
1487         *     and
1488         * <li>both have same Fact Table; and
1489         * <li>matching columns of this and other batch has the same value; and
1490         * <li>non matching columns of this batch have ALL VALUES
1491         * </ul>
1492         */
1493        boolean canBatch(Batch other) {
1494            return hasOverlappingBitKeys(other)
1495                && constraintsMatch(other)
1496                && hasSameMeasureList(other)
1497                && !hasDistinctCountMeasure()
1498                && !other.hasDistinctCountMeasure()
1499                && haveSameStarAndAggregation(other)
1500                && haveSameClosureColumns(other);
1501        }
1502
1503        /**
1504         * Returns whether the constraints on this Batch subsume the constraints
1505         * on another Batch and therefore the other Batch can be subsumed into
1506         * this one for GROUPING SETS purposes. Not symmetric.
1507         *
1508         * @param other Other batch
1509         * @return Whether other batch can be subsumed into this one
1510         */
1511        private boolean constraintsMatch(Batch other) {
1512            if (areBothDistinctCountBatches(other)) {
1513                if (getConstrainedColumnsBitKey().equals(
1514                        other.getConstrainedColumnsBitKey()))
1515                {
1516                    return hasSameCompoundPredicate(other)
1517                        && haveSameValues(other);
1518                } else {
1519                    return hasSameCompoundPredicate(other)
1520                        || (other.batchKey.getCompoundPredicateList().isEmpty()
1521                            || equalConstraint(
1522                                batchKey.getCompoundPredicateList(),
1523                                other.batchKey.getCompoundPredicateList()))
1524                        && haveSameValues(other);
1525                }
1526            } else {
1527                return haveSameValues(other);
1528            }
1529        }
1530
1531        private boolean equalConstraint(
1532            List<StarPredicate> predList1,
1533            List<StarPredicate> predList2)
1534        {
1535            if (predList1.size() != predList2.size()) {
1536                return false;
1537            }
1538            for (int i = 0; i < predList1.size(); i++) {
1539                StarPredicate pred1 = predList1.get(i);
1540                StarPredicate pred2 = predList2.get(i);
1541                if (!pred1.equalConstraint(pred2)) {
1542                    return false;
1543                }
1544            }
1545            return true;
1546        }
1547
1548        private boolean areBothDistinctCountBatches(Batch other) {
1549            return this.hasDistinctCountMeasure()
1550                && !this.hasNormalMeasures()
1551                && other.hasDistinctCountMeasure()
1552                && !other.hasNormalMeasures();
1553        }
1554
1555        private boolean hasNormalMeasures() {
1556            return getDistinctMeasureCount(measuresList)
1557                !=  measuresList.size();
1558        }
1559
1560        private boolean hasSameMeasureList(Batch other) {
1561            return this.measuresList.size() == other.measuresList.size()
1562                   && this.measuresList.containsAll(other.measuresList);
1563        }
1564
1565        boolean hasOverlappingBitKeys(Batch other) {
1566            return getConstrainedColumnsBitKey()
1567                .isSuperSetOf(other.getConstrainedColumnsBitKey());
1568        }
1569
1570        boolean hasDistinctCountMeasure() {
1571            return getDistinctMeasureCount(measuresList) > 0;
1572        }
1573
1574        boolean hasSameCompoundPredicate(Batch other) {
1575            final StarPredicate starPredicate = compoundPredicate();
1576            final StarPredicate otherStarPredicate = other.compoundPredicate();
1577            if (starPredicate == null && otherStarPredicate == null) {
1578                return true;
1579            } else if (starPredicate != null && otherStarPredicate != null) {
1580                return starPredicate.equalConstraint(otherStarPredicate);
1581            }
1582            return false;
1583        }
1584
1585        private StarPredicate compoundPredicate() {
1586            StarPredicate predicate = null;
1587            for (Set<StarColumnPredicate> valueSet : valueSets) {
1588                StarPredicate orPredicate = null;
1589                for (StarColumnPredicate starColumnPredicate : valueSet) {
1590                    if (orPredicate == null) {
1591                        orPredicate = starColumnPredicate;
1592                    } else {
1593                        orPredicate = orPredicate.or(starColumnPredicate);
1594                    }
1595                }
1596                if (predicate == null) {
1597                    predicate = orPredicate;
1598                } else {
1599                    predicate = predicate.and(orPredicate);
1600                }
1601            }
1602            for (StarPredicate starPredicate
1603                : batchKey.getCompoundPredicateList())
1604            {
1605                if (predicate == null) {
1606                    predicate = starPredicate;
1607                } else {
1608                    predicate = predicate.and(starPredicate);
1609                }
1610            }
1611            return predicate;
1612        }
1613
1614        boolean haveSameStarAndAggregation(Batch other) {
1615            boolean rollup[] = {false};
1616            boolean otherRollup[] = {false};
1617
1618            boolean hasSameAggregation =
1619                getAgg(rollup) == other.getAgg(otherRollup);
1620            boolean hasSameRollupOption = rollup[0] == otherRollup[0];
1621
1622            boolean hasSameStar = getStar().equals(other.getStar());
1623            return hasSameStar && hasSameAggregation && hasSameRollupOption;
1624        }
1625
1626        /**
1627         * Returns whether this batch has the same closure columns as another.
1628         *
1629         * <p>Ensures that we do not group together a batch that includes a
1630         * level of a parent-child closure dimension with a batch that does not.
1631         * It is not safe to roll up from a parent-child closure level; due to
1632         * multiple accounting, the 'all' level is less than the sum of the
1633         * members of the closure level.
1634         *
1635         * @param other Other batch
1636         * @return Whether batches have the same closure columns
1637         */
1638        boolean haveSameClosureColumns(Batch other) {
1639            final BitKey cubeClosureColumnBitKey = cube.closureColumnBitKey;
1640            if (cubeClosureColumnBitKey == null) {
1641                // Virtual cubes have a null bitkey. For now, punt; should do
1642                // better.
1643                return true;
1644            }
1645            final BitKey closureColumns =
1646                this.batchKey.getConstrainedColumnsBitKey()
1647                    .and(cubeClosureColumnBitKey);
1648            final BitKey otherClosureColumns =
1649                other.batchKey.getConstrainedColumnsBitKey()
1650                    .and(cubeClosureColumnBitKey);
1651            return closureColumns.equals(otherClosureColumns);
1652        }
1653
1654        /**
1655         * @param rollup Out parameter
1656         * @return AggStar
1657         */
1658        private AggStar getAgg(boolean[] rollup) {
1659            return AggregationManager.findAgg(
1660                getStar(),
1661                getConstrainedColumnsBitKey(),
1662                makeMeasureBitKey(),
1663                rollup);
1664        }
1665
1666        private BitKey makeMeasureBitKey() {
1667            BitKey bitKey = getConstrainedColumnsBitKey().emptyCopy();
1668            for (RolapStar.Measure measure : measuresList) {
1669                bitKey.set(measure.getBitPosition());
1670            }
1671            return bitKey;
1672        }
1673
1674        /**
1675         * Return whether have same values for overlapping columns or
1676         * has all children for others.
1677         */
1678        boolean haveSameValues(
1679            Batch other)
1680        {
1681            for (int j = 0; j < columns.length; j++) {
1682                boolean isCommonColumn = false;
1683                for (int i = 0; i < other.columns.length; i++) {
1684                    if (areSameColumns(other.columns[i], columns[j])) {
1685                        if (hasSameValues(other.valueSets[i], valueSets[j])) {
1686                            isCommonColumn = true;
1687                            break;
1688                        } else {
1689                            return false;
1690                        }
1691                    }
1692                }
1693                if (!isCommonColumn
1694                    && !hasAllValues(columns[j], valueSets[j]))
1695                {
1696                    return false;
1697                }
1698            }
1699            return true;
1700        }
1701
1702        private boolean hasAllValues(
1703            RolapStar.Column column,
1704            Set<StarColumnPredicate> valueSet)
1705        {
1706            return column.getCardinality() == valueSet.size();
1707        }
1708
1709        private boolean areSameColumns(
1710            RolapStar.Column otherColumn,
1711            RolapStar.Column thisColumn)
1712        {
1713            return otherColumn.equals(thisColumn);
1714        }
1715
1716        private boolean hasSameValues(
1717            Set<StarColumnPredicate> otherValueSet,
1718            Set<StarColumnPredicate> thisValueSet)
1719        {
1720            return otherValueSet.equals(thisValueSet);
1721        }
1722    }
1723
1724    private static class CompositeBatchComparator
1725        implements Comparator<CompositeBatch>
1726    {
1727        static final CompositeBatchComparator instance =
1728            new CompositeBatchComparator();
1729
1730        public int compare(CompositeBatch o1, CompositeBatch o2) {
1731            return BatchComparator.instance.compare(
1732                o1.detailedBatch,
1733                o2.detailedBatch);
1734        }
1735    }
1736
1737    private static class BatchComparator implements Comparator<Batch> {
1738        static final BatchComparator instance = new BatchComparator();
1739
1740        private BatchComparator() {
1741        }
1742
1743        public int compare(
1744            Batch o1, Batch o2)
1745        {
1746            if (o1.columns.length != o2.columns.length) {
1747                return o1.columns.length - o2.columns.length;
1748            }
1749            for (int i = 0; i < o1.columns.length; i++) {
1750                int c = o1.columns[i].getName().compareTo(
1751                    o2.columns[i].getName());
1752                if (c != 0) {
1753                    return c;
1754                }
1755            }
1756            for (int i = 0; i < o1.columns.length; i++) {
1757                int c = compare(o1.valueSets[i], o2.valueSets[i]);
1758                if (c != 0) {
1759                    return c;
1760                }
1761            }
1762            return 0;
1763        }
1764
1765        <T> int compare(Set<T> set1, Set<T> set2) {
1766            if (set1.size() != set2.size()) {
1767                return set1.size() - set2.size();
1768            }
1769            Iterator<T> iter1 = set1.iterator();
1770            Iterator<T> iter2 = set2.iterator();
1771            while (iter1.hasNext()) {
1772                T v1 = iter1.next();
1773                T v2 = iter2.next();
1774                int c = Util.compareKey(v1, v2);
1775                if (c != 0) {
1776                    return c;
1777                }
1778            }
1779            return 0;
1780        }
1781    }
1782
1783    private static class ValueColumnConstraintComparator
1784        implements Comparator<ValueColumnPredicate>
1785    {
1786        static final ValueColumnConstraintComparator instance =
1787            new ValueColumnConstraintComparator();
1788
1789        private ValueColumnConstraintComparator() {
1790        }
1791
1792        public int compare(
1793            ValueColumnPredicate o1,
1794            ValueColumnPredicate o2)
1795        {
1796            Object v1 = o1.getValue();
1797            Object v2 = o2.getValue();
1798            if (v1.getClass() == v2.getClass()
1799                && v1 instanceof Comparable)
1800            {
1801                return ((Comparable) v1).compareTo(v2);
1802            } else {
1803                return v1.toString().compareTo(v2.toString());
1804            }
1805        }
1806    }
1807
1808}
1809
1810// End FastBatchingCellReader.java