001/*
002// This software is subject to the terms of the Eclipse Public License v1.0
003// Agreement, available at the following URL:
004// http://www.eclipse.org/legal/epl-v10.html.
005// You must accept the terms of that agreement to use this software.
006//
007// Copyright (C) 2011-2013 Pentaho and others
008// All Rights Reserved.
009*/
010package mondrian.rolap.agg;
011
012import mondrian.olap.Aggregator;
013import mondrian.olap.Util;
014import mondrian.rolap.*;
015import mondrian.rolap.agg.Segment.ExcludedRegion;
016import mondrian.rolap.sql.SqlQuery;
017import mondrian.spi.*;
018import mondrian.spi.Dialect.Datatype;
019import mondrian.util.ArraySortedSet;
020import mondrian.util.Pair;
021
022import java.lang.ref.WeakReference;
023import java.math.BigInteger;
024import java.util.*;
025import java.util.Map.Entry;
026
027/**
028 * Helper class that contains methods to convert between
029 * {@link Segment} and {@link SegmentHeader}, and also
030 * {@link SegmentWithData} and {@link SegmentBody}.
031 *
032 * @author LBoudreau
033 */
034public class SegmentBuilder {
035    /**
036     * Converts a segment plus a {@link SegmentBody} into a
037     * {@link mondrian.rolap.agg.SegmentWithData}.
038     *
039     * @param segment Segment
040     * @param sb Segment body
041     * @return SegmentWithData
042     */
043    public static SegmentWithData addData(Segment segment, SegmentBody sb) {
044        // Load the axis keys for this segment
045        SegmentAxis[] axes =
046            new SegmentAxis[segment.predicates.length];
047        for (int i = 0; i < segment.predicates.length; i++) {
048            StarColumnPredicate predicate =
049                segment.predicates[i];
050            axes[i] =
051                new SegmentAxis(
052                    predicate,
053                    sb.getAxisValueSets()[i],
054                    sb.getNullAxisFlags()[i]);
055        }
056        final SegmentDataset dataSet = createDataset(sb, axes);
057        return new SegmentWithData(segment, dataSet, axes);
058    }
059
060    /**
061     * Creates a SegmentDataset that contains the cached
062     * data and is initialized to be used with the supplied segment.
063     *
064     * @param body Segment with which the returned dataset will be associated
065     * @param axes Segment axes, containing actual column values
066     * @return A SegmentDataset object that contains cached data.
067     */
068    private static SegmentDataset createDataset(
069        SegmentBody body,
070        SegmentAxis[] axes)
071    {
072        final SegmentDataset dataSet;
073        if (body instanceof DenseDoubleSegmentBody) {
074            dataSet =
075                new DenseDoubleSegmentDataset(
076                    axes,
077                    (double[]) body.getValueArray(),
078                    body.getNullValueIndicators());
079        } else if (body instanceof DenseIntSegmentBody) {
080            dataSet =
081                new DenseIntSegmentDataset(
082                    axes,
083                    (int[]) body.getValueArray(),
084                    body.getNullValueIndicators());
085        } else if (body instanceof DenseObjectSegmentBody) {
086            dataSet =
087                new DenseObjectSegmentDataset(
088                    axes, (Object[]) body.getValueArray());
089        } else if (body instanceof SparseSegmentBody) {
090            dataSet = new SparseSegmentDataset(body.getValueMap());
091        } else {
092            throw Util.newInternal(
093                "Unknown segment body type: " + body.getClass() + ": " + body);
094        }
095        return dataSet;
096    }
097
098    /**
099     * Creates a segment from a SegmentHeader. The star,
100     * constrainedColsBitKey, constrainedColumns and measure arguments are a
101     * helping hand, because we know what we were looking for.
102     *
103     * @param header The header to convert.
104     * @param star Star
105     * @param constrainedColumnsBitKey Constrained columns
106     * @param constrainedColumns Constrained columns
107     * @param measure Measure
108     * @return Segment
109     */
110    public static Segment toSegment(
111        SegmentHeader header,
112        RolapStar star,
113        BitKey constrainedColumnsBitKey,
114        RolapStar.Column[] constrainedColumns,
115        RolapStar.Measure measure,
116        List<StarPredicate> compoundPredicates)
117    {
118        final List<StarColumnPredicate> predicateList =
119            new ArrayList<StarColumnPredicate>();
120        for (int i = 0; i < constrainedColumns.length; i++) {
121            RolapStar.Column constrainedColumn = constrainedColumns[i];
122            final SortedSet<Comparable> values =
123                header.getConstrainedColumns().get(i).values;
124            StarColumnPredicate predicate;
125            if (values == null) {
126                predicate =
127                    new LiteralStarPredicate(
128                        constrainedColumn,
129                        true);
130            } else if (values.size() == 1) {
131                predicate =
132                    new ValueColumnPredicate(
133                        constrainedColumn,
134                        values.first());
135            } else {
136                final List<StarColumnPredicate> valuePredicateList =
137                    new ArrayList<StarColumnPredicate>();
138                for (Object value : values) {
139                    valuePredicateList.add(
140                        new ValueColumnPredicate(
141                            constrainedColumn,
142                            value));
143                }
144                predicate =
145                    new ListColumnPredicate(
146                        constrainedColumn,
147                        valuePredicateList);
148            }
149            predicateList.add(predicate);
150        }
151
152        return new Segment(
153            star,
154            constrainedColumnsBitKey,
155            constrainedColumns,
156            measure,
157            predicateList.toArray(
158                new StarColumnPredicate[predicateList.size()]),
159            new ExcludedRegionList(header),
160            compoundPredicates);
161    }
162
163    /**
164     * Given a collection of segments, all of the same dimensionality, rolls up
165     * to create a segment with reduced dimensionality.
166     *
167     * @param map Source segment headers and bodies
168     * @param keepColumns A list of column names to keep as part of
169     * the rolled up segment.
170     * @param targetBitkey The column bit key to match with the
171     * resulting segment.
172     * @param rollupAggregator The aggregator to use to rollup.
173     * @return Segment header and body of requested dimensionality
174     * @param datatype The data type to use.
175     */
176    public static Pair<SegmentHeader, SegmentBody> rollup(
177        Map<SegmentHeader, SegmentBody> map,
178        Set<String> keepColumns,
179        BitKey targetBitkey,
180        Aggregator rollupAggregator,
181        Datatype datatype)
182    {
183        class AxisInfo {
184            SegmentColumn column;
185            SortedSet<Comparable> requestedValues;
186            SortedSet<Comparable> valueSet;
187            Comparable[] values;
188            boolean hasNull;
189            int src;
190            boolean lostPredicate;
191        }
192        final SegmentHeader firstHeader = map.keySet().iterator().next();
193        final AxisInfo[] axes =
194            new AxisInfo[keepColumns.size()];
195        int z = 0, j = 0;
196        for (SegmentColumn column : firstHeader.getConstrainedColumns()) {
197            if (keepColumns.contains(column.columnExpression)) {
198                final AxisInfo axisInfo = axes[z++] = new AxisInfo();
199                axisInfo.src = j;
200                axisInfo.column = column;
201                axisInfo.requestedValues = column.values;
202            }
203            j++;
204        }
205
206        // Compute the sets of values in each axis of the target segment. These
207        // are the intersection of the input axes.
208        for (Map.Entry<SegmentHeader, SegmentBody> entry : map.entrySet()) {
209            final SegmentHeader header = entry.getKey();
210            for (AxisInfo axis : axes) {
211                final SortedSet<Comparable> values =
212                    entry.getValue().getAxisValueSets()[axis.src];
213                final SegmentColumn headerColumn =
214                    header.getConstrainedColumn(axis.column.columnExpression);
215                final boolean hasNull =
216                    entry.getValue().getNullAxisFlags()[axis.src];
217                final SortedSet<Comparable> requestedValues =
218                    headerColumn.getValues();
219                if (axis.valueSet == null) {
220                    axis.valueSet = new TreeSet<Comparable>(values);
221                    axis.hasNull = hasNull;
222                    axis.requestedValues = requestedValues;
223                } else {
224                    final SortedSet<Comparable> filteredValues;
225                    final boolean filteredHasNull;
226                    if (axis.requestedValues == null) {
227                        filteredValues = values;
228                        filteredHasNull = hasNull;
229                    } else {
230                        filteredValues = Util.intersect(
231                            values,
232                            axis.requestedValues);
233
234                        // SegmentColumn predicates cannot ask for the null
235                        // value (at present).
236                        filteredHasNull = false;
237                    }
238                    axis.valueSet.addAll(filteredValues);
239                    axis.hasNull = axis.hasNull || filteredHasNull;
240                    if (!Util.equals(axis.requestedValues, requestedValues)) {
241                        if (axis.requestedValues == null) {
242                            // Downgrade from wildcard to a specific list.
243                            axis.requestedValues = requestedValues;
244                        } else {
245                            // Segment requests have incompatible predicates.
246                            // Best we can say is "we must have asked for the
247                            // values that came back".
248                            axis.lostPredicate = true;
249                        }
250                    }
251                }
252            }
253        }
254
255        for (AxisInfo axis : axes) {
256            axis.values =
257                axis.valueSet.toArray(new Comparable[axis.valueSet.size()]);
258        }
259
260        // Populate cells.
261        //
262        // (This is a rough implementation, very inefficient. It makes all
263        // segment types pretend to be sparse, for purposes of reading. It
264        // maps all axis ordinals to a value, then back to an axis ordinal,
265        // even if this translation were not necessary, say if the source and
266        // target axes had the same set of values. And it always creates a
267        // sparse segment.
268        //
269        // We should do really efficient rollup if the source is an array: we
270        // should box values (e.g double to Double and back), and we should read
271        // a stripe of values from the and add them up into a single cell.
272        final Map<CellKey, List<Object>> cellValues =
273            new HashMap<CellKey, List<Object>>();
274        for (Map.Entry<SegmentHeader, SegmentBody> entry : map.entrySet()) {
275            final int[] pos = new int[axes.length];
276            final Comparable[][] valueArrays =
277                new Comparable[firstHeader.getConstrainedColumns().size()][];
278            final SegmentBody body = entry.getValue();
279
280            // Copy source value sets into arrays. For axes that are being
281            // projected away, store null.
282            z = 0;
283            for (SortedSet<Comparable> set : body.getAxisValueSets()) {
284                valueArrays[z] = keepColumns.contains(
285                    firstHeader.getConstrainedColumns().get(z).columnExpression)
286                        ? set.toArray(new Comparable[set.size()])
287                        : null;
288                ++z;
289            }
290            Map<CellKey, Object> v = body.getValueMap();
291            entryLoop:
292            for (Map.Entry<CellKey, Object> vEntry : v.entrySet()) {
293                z = 0;
294                for (int i = 0; i < vEntry.getKey().size(); i++) {
295                    final Comparable[] valueArray = valueArrays[i];
296                    if (valueArray == null) {
297                        continue;
298                    }
299                    final int ordinal = vEntry.getKey().getOrdinals()[i];
300                    final int targetOrdinal;
301                    if (axes[z].hasNull && ordinal == valueArray.length) {
302                        targetOrdinal = axes[z].valueSet.size();
303                    } else {
304                        final Comparable value = valueArray[ordinal];
305                        if (value == null) {
306                            targetOrdinal = axes[z].valueSet.size();
307                        } else {
308                            targetOrdinal =
309                                Util.binarySearch(
310                                    axes[z].values,
311                                    0, axes[z].values.length,
312                                    value);
313                        }
314                    }
315                    if (targetOrdinal >= 0) {
316                        pos[z++] = targetOrdinal;
317                    } else {
318                        // This happens when one of the rollup candidate doesn't
319                        // contain the requested cell.
320                        continue entryLoop;
321                    }
322                }
323                final CellKey ck = CellKey.Generator.newCellKey(pos);
324                if (!cellValues.containsKey(ck)) {
325                    cellValues.put(ck, new ArrayList<Object>());
326                }
327                cellValues.get(ck).add(vEntry.getValue());
328            }
329        }
330
331        // Build the axis list.
332        final List<Pair<SortedSet<Comparable>, Boolean>> axisList =
333            new ArrayList<Pair<SortedSet<Comparable>, Boolean>>();
334        BigInteger bigValueCount = BigInteger.ONE;
335        for (AxisInfo axis : axes) {
336            axisList.add(Pair.of(axis.valueSet, axis.hasNull));
337            int size = axis.values.length;
338            if (axis.hasNull) {
339                ++size;
340            }
341            bigValueCount = bigValueCount.multiply(
342                BigInteger.valueOf(axis.hasNull ? size + 1 : size));
343        }
344
345        // The logic used here for the sparse check follows
346        // SegmentLoader.setAxisDataAndDecideSparseUse.
347        // The two methods use different data structures (AxisInfo/SegmentAxis)
348        // so combining logic is probably more trouble than it's worth.
349        final boolean sparse =
350            bigValueCount.compareTo(BigInteger.valueOf(Integer.MAX_VALUE)) > 0
351                || SegmentLoader.useSparse(
352                    bigValueCount.doubleValue(), cellValues.size());
353
354        final int[] axisMultipliers =
355            computeAxisMultipliers(axisList);
356
357        final SegmentBody body;
358        // Peak at the values and determine the best way to store them
359        // (whether to use a dense native dataset or a sparse one.
360        if (cellValues.size() == 0) {
361            // Just store the data into an empty dense object dataset.
362            body =
363                new DenseObjectSegmentBody(
364                    new Object[0],
365                    axisList);
366        } else if (sparse) {
367            // The rule says we must use a sparse dataset.
368            // First, aggregate the values of each key.
369            final Map<CellKey, Object> data =
370                new HashMap<CellKey, Object>();
371            for (Entry<CellKey, List<Object>> entry
372                : cellValues.entrySet())
373            {
374                data.put(
375                    CellKey.Generator.newCellKey(entry.getKey().getOrdinals()),
376                    rollupAggregator.aggregate(
377                        entry.getValue(),
378                        datatype));
379            }
380            body =
381                new SparseSegmentBody(
382                    data,
383                    axisList);
384        } else {
385            final BitSet nullValues;
386            final int valueCount = bigValueCount.intValue();
387            switch (datatype) {
388            case Integer:
389                final int[] ints = new int[valueCount];
390                nullValues = Util.bitSetBetween(0, valueCount);
391                for (Entry<CellKey, List<Object>> entry
392                    : cellValues.entrySet())
393                {
394                    final int offset =
395                        CellKey.Generator.getOffset(
396                            entry.getKey().getOrdinals(), axisMultipliers);
397                    final Object value =
398                        rollupAggregator.aggregate(
399                            entry.getValue(),
400                            datatype);
401                    if (value != null) {
402                        ints[offset] = (Integer) value;
403                        nullValues.clear(offset);
404                    }
405                }
406                body =
407                    new DenseIntSegmentBody(
408                        nullValues,
409                        ints,
410                        axisList);
411                  break;
412            case Numeric:
413                final double[] doubles = new double[valueCount];
414                nullValues = Util.bitSetBetween(0, valueCount);
415                for (Entry<CellKey, List<Object>> entry
416                    : cellValues.entrySet())
417                {
418                    final int offset =
419                        CellKey.Generator.getOffset(
420                            entry.getKey().getOrdinals(), axisMultipliers);
421                    final Object value =
422                        rollupAggregator.aggregate(
423                            entry.getValue(),
424                            datatype);
425                    if (value != null) {
426                        doubles[offset] = (Double) value;
427                        nullValues.clear(offset);
428                    }
429                }
430                body =
431                    new DenseDoubleSegmentBody(
432                        nullValues,
433                        doubles,
434                        axisList);
435                break;
436            default:
437                final Object[] objects = new Object[valueCount];
438                for (Entry<CellKey, List<Object>> entry
439                    : cellValues.entrySet())
440                {
441                    final int offset =
442                        CellKey.Generator.getOffset(
443                            entry.getKey().getOrdinals(), axisMultipliers);
444                    objects[offset] =
445                        rollupAggregator.aggregate(
446                            entry.getValue(),
447                            datatype);
448                }
449                body =
450                    new DenseObjectSegmentBody(
451                        objects,
452                        axisList);
453            }
454        }
455
456        // Create header.
457        final List<SegmentColumn> constrainedColumns =
458            new ArrayList<SegmentColumn>();
459        for (int i = 0; i < axes.length; i++) {
460            AxisInfo axisInfo = axes[i];
461            constrainedColumns.add(
462                new SegmentColumn(
463                    axisInfo.column.getColumnExpression(),
464                    axisInfo.column.getValueCount(),
465                    axisInfo.lostPredicate
466                        ? axisList.get(i).left
467                        : axisInfo.column.values));
468        }
469        final SegmentHeader header =
470            new SegmentHeader(
471                firstHeader.schemaName,
472                firstHeader.schemaChecksum,
473                firstHeader.cubeName,
474                firstHeader.measureName,
475                constrainedColumns,
476                firstHeader.compoundPredicates,
477                firstHeader.rolapStarFactTableName,
478                targetBitkey,
479                Collections.<SegmentColumn>emptyList());
480
481        return Pair.of(header, body);
482    }
483
484    private static int[] computeAxisMultipliers(
485        List<Pair<SortedSet<Comparable>, Boolean>> axes)
486    {
487        final int[] axisMultipliers = new int[axes.size()];
488        int multiplier = 1;
489        for (int i = axes.size() - 1; i >= 0; --i) {
490            axisMultipliers[i] = multiplier;
491            multiplier *= axes.get(i).left.size();
492        }
493        return axisMultipliers;
494    }
495
496    private static class ExcludedRegionList
497        extends AbstractList<Segment.ExcludedRegion>
498        implements Segment.ExcludedRegion
499    {
500        private final int cellCount;
501        private final SegmentHeader header;
502        public ExcludedRegionList(SegmentHeader header) {
503            this.header = header;
504            int cellCount = 1;
505            for (SegmentColumn cc : header.getExcludedRegions()) {
506                // TODO find a way to approximate the cardinality
507                // of wildcard columns.
508                if (cc.values != null) {
509                    cellCount *= cc.values.size();
510                }
511            }
512            this.cellCount = cellCount;
513        }
514
515        public void describe(StringBuilder buf) {
516            // TODO
517        }
518
519        public int getArity() {
520            return header.getConstrainedColumns().size();
521        }
522
523        public int getCellCount() {
524            return cellCount;
525        }
526
527        public boolean wouldContain(Object[] keys) {
528            assert keys.length == header.getConstrainedColumns().size();
529            for (int i = 0; i < keys.length; i++) {
530                final SegmentColumn excl =
531                    header.getExcludedRegion(
532                        header.getConstrainedColumns().get(i).columnExpression);
533                if (excl == null) {
534                    continue;
535                }
536                if (excl.values.contains(keys[i])) {
537                    return true;
538                }
539            }
540            return false;
541        }
542
543        public ExcludedRegion get(int index) {
544            return this;
545        }
546
547        public int size() {
548            return 1;
549        }
550    }
551
552    /**
553     * Tells if the passed segment is a subset of this segment
554     * and could be used for a rollup in cache operation.
555     * @param segment A segment which might be a subset of the
556     * current segment.
557     * @return True or false.
558     */
559    public static boolean isSubset(
560        SegmentHeader header,
561        Segment segment)
562    {
563        if (!segment.getStar().getSchema().getName()
564            .equals(header.schemaName))
565        {
566            return false;
567        }
568        if (!segment.getStar().getFactTable().getAlias()
569                .equals(header.rolapStarFactTableName))
570        {
571            return false;
572        }
573        if (!segment.measure.getName().equals(header.measureName)) {
574            return false;
575        }
576        if (!segment.measure.getCubeName().equals(header.cubeName)) {
577            return false;
578        }
579        if (segment.getConstrainedColumnsBitKey()
580                .equals(header.constrainedColsBitKey))
581        {
582            return true;
583        }
584        return false;
585    }
586
587    public static List<SegmentColumn> toConstrainedColumns(
588        StarColumnPredicate[] predicates)
589    {
590        return toConstrainedColumns(
591            Arrays.asList(predicates));
592    }
593
594    public static List<SegmentColumn> toConstrainedColumns(
595        Collection<StarColumnPredicate> predicates)
596    {
597        List<SegmentColumn> ccs =
598            new ArrayList<SegmentColumn>();
599        for (StarColumnPredicate predicate : predicates) {
600            final List<Comparable> values =
601                new ArrayList<Comparable>();
602            predicate.values(Util.cast(values));
603            final Comparable[] valuesArray =
604                values.toArray(new Comparable[values.size()]);
605            if (valuesArray.length == 1 && valuesArray[0].equals(true)) {
606                ccs.add(
607                    new SegmentColumn(
608                        predicate.getConstrainedColumn()
609                            .getExpression().getGenericExpression(),
610                        predicate.getConstrainedColumn().getCardinality(),
611                        null));
612            } else {
613                Arrays.sort(
614                    valuesArray,
615                    Util.SqlNullSafeComparator.instance);
616                ccs.add(
617                    new SegmentColumn(
618                        predicate.getConstrainedColumn()
619                            .getExpression().getGenericExpression(),
620                        predicate.getConstrainedColumn().getCardinality(),
621                        new ArraySortedSet(valuesArray)));
622            }
623        }
624        return ccs;
625    }
626
627    /**
628     * Creates a SegmentHeader object describing the supplied
629     * Segment object.
630     *
631     * @param segment A segment object for which we want to generate
632     * a SegmentHeader.
633     * @return A SegmentHeader describing the supplied Segment object.
634     */
635    public static SegmentHeader toHeader(Segment segment) {
636        final List<SegmentColumn> cc =
637            SegmentBuilder.toConstrainedColumns(segment.predicates);
638        final List<String> cp = new ArrayList<String>();
639
640        StringBuilder buf = new StringBuilder();
641
642        for (StarPredicate compoundPredicate : segment.compoundPredicateList) {
643            buf.setLength(0);
644            SqlQuery query =
645                new SqlQuery(
646                    segment.star.getSqlQueryDialect());
647            compoundPredicate.toSql(query, buf);
648            cp.add(buf.toString());
649        }
650        final RolapSchema schema = segment.star.getSchema();
651        return new SegmentHeader(
652            schema.getName(),
653            schema.getChecksum(),
654            segment.measure.getCubeName(),
655            segment.measure.getName(),
656            cc,
657            cp,
658            segment.star.getFactTable().getAlias(),
659            segment.constrainedColumnsBitKey,
660            Collections.<SegmentColumn>emptyList());
661    }
662
663    private static RolapStar.Column[] getConstrainedColumns(
664        RolapStar star,
665        BitKey bitKey)
666    {
667        final List<RolapStar.Column> list =
668            new ArrayList<RolapStar.Column>();
669        for (int bit : bitKey) {
670            list.add(star.getColumn(bit));
671        }
672        return list.toArray(new RolapStar.Column[list.size()]);
673    }
674
675    /**
676     * Functor to convert a segment header and body into a
677     * {@link mondrian.rolap.agg.SegmentWithData}.
678     */
679    public static interface SegmentConverter {
680        SegmentWithData convert(
681            SegmentHeader header,
682            SegmentBody body);
683    }
684
685    /**
686     * Implementation of {@link SegmentConverter} that uses an
687     * {@link mondrian.rolap.agg.AggregationKey}
688     * and {@link mondrian.rolap.agg.CellRequest} as context to
689     * convert a {@link mondrian.spi.SegmentHeader}.
690     *
691     * <p>This is nasty. A converter might be used for several headers, not
692     * necessarily with the context as the cell request and aggregation key.
693     * Converters only exist for fact tables and compound predicate combinations
694     * for which we have already done a load request.</p>
695     *
696     * <p>It would be much better if there was a way to convert compound
697     * predicates from strings to predicates. Then we could obsolete the
698     * messy context inside converters, and maybe obsolete converters
699     * altogether.</p>
700     */
701    public static class SegmentConverterImpl implements SegmentConverter {
702        private final AggregationKey key;
703        private final CellRequest request;
704
705        public SegmentConverterImpl(AggregationKey key, CellRequest request) {
706            this.key = key;
707            this.request = request;
708        }
709
710        public SegmentWithData convert(
711            SegmentHeader header,
712            SegmentBody body)
713        {
714            final Segment segment =
715                toSegment(
716                    header,
717                    key.getStar(),
718                    header.getConstrainedColumnsBitKey(),
719                    getConstrainedColumns(
720                        key.getStar(),
721                        header.getConstrainedColumnsBitKey()),
722                    request.getMeasure(),
723                    key.getCompoundPredicateList());
724            return addData(segment, body);
725        }
726    }
727
728    /**
729     * Implementation of {@link SegmentConverter} that uses a star measure
730     * and a list of {@link StarPredicate}.
731     */
732    public static class StarSegmentConverter implements SegmentConverter {
733        private final WeakReference<RolapStar.Measure> measure;
734        private final List<StarPredicate> compoundPredicateList;
735
736        public StarSegmentConverter(
737            RolapStar.Measure measure,
738            List<StarPredicate> compoundPredicateList)
739        {
740            // The measure is wrapped in a weak reference because
741            // converters are put into the SegmentCacheIndex,
742            // but the registry of indexes is based as a weak
743            // list of the RolapStars.
744            // Simply put, the fact that converters have a hard
745            // link on the measure would prevents the GC from
746            // ever cleaning the registry. The circular references
747            // are a well known issue with weak lists.
748            // It is harmless to use a weak reference here because
749            // the measure is referenced by cubes and what-not,
750            // so it can't be GC'd before its time has come.
751            this.measure = new WeakReference<RolapStar.Measure>(measure);
752            this.compoundPredicateList = compoundPredicateList;
753        }
754
755        public SegmentWithData convert(
756            SegmentHeader header,
757            SegmentBody body)
758        {
759            RolapStar.Measure m1 = measure.get();
760            assert m1 != null
761                : "Invalid state. A reference to the measure object was picked up by GC but the index wasn't cleared. This shouldn't happen.";
762            final Segment segment =
763                toSegment(
764                    header,
765                    m1.getStar(),
766                    header.getConstrainedColumnsBitKey(),
767                    getConstrainedColumns(
768                        m1.getStar(),
769                        header.getConstrainedColumnsBitKey()),
770                    m1,
771                    compoundPredicateList);
772            return addData(segment, body);
773        }
774    }
775}
776
777// End SegmentBuilder.java