001/*
002// This software is subject to the terms of the Eclipse Public License v1.0
003// Agreement, available at the following URL:
004// http://www.eclipse.org/legal/epl-v10.html.
005// You must accept the terms of that agreement to use this software.
006//
007// Copyright (C) 2011-2013 Pentaho
008// All Rights Reserved.
009*/
010package mondrian.rolap.agg;
011
012import mondrian.olap.*;
013import mondrian.olap.CacheControl.CellRegion;
014import mondrian.resource.MondrianResource;
015import mondrian.rolap.*;
016import mondrian.rolap.cache.*;
017import mondrian.server.Execution;
018import mondrian.server.Locus;
019import mondrian.server.monitor.*;
020import mondrian.spi.*;
021import mondrian.util.*;
022
023import org.apache.log4j.Logger;
024
025import java.io.PrintWriter;
026import java.util.*;
027import java.util.Map.Entry;
028import java.util.concurrent.*;
029
030/**
031 * Active object that maintains the "global cache" (in JVM, but shared between
032 * connections using a particular schema) and "external cache" (as implemented
033 * by a {@link mondrian.spi.SegmentCache}.
034 *
035 * <p>Segment states</p>
036 *
037 * <table>
038 *     <tr><th>State</th><th>Meaning</th></tr>
039 *     <tr><td>Local</td><td>Initial state of a segment</td></tr>
040 * </table>
041 *
042 * <h2>Decisions to be reviewed</h2>
043 *
044 * <p>1. Create variant of actor that processes all requests synchronously,
045 * and does not need a thread. This would be a more 'embedded' mode of operation
046 * (albeit with worse scale-out).</p>
047 *
048 * <p>2. Move functionality into AggregationManager?</p>
049 *
050 * <p>3. Delete {@link mondrian.rolap.RolapStar#lookupOrCreateAggregation}
051 * and {@link mondrian.rolap.RolapStar#lookupSegment}
052 * and {@link mondrian.rolap.RolapStar}.lookupAggregationShared
053 * (formerly RolapStar.lookupAggregation).</p>
054 *
055 *
056 *
057 *
058 * <h2>Moved methods</h2>
059 *
060 * <p>(Keeping track of where methods came from will make it easier to merge
061 * to the mondrian-4 code line.)</p>
062 *
063 * <p>1. {@link mondrian.rolap.RolapStar#getCellFromCache} moved from
064 * {@link Aggregation}.getCellValue</p>
065 *
066 *
067 *
068 * <h2>Done</h2>
069 *
070 * <p>1. Obsolete CountingAggregationManager, and property
071 * mondrian.rolap.agg.enableCacheHitCounters.</p>
072 *
073 * <p>2. AggregationManager becomes non-singleton.</p>
074 *
075 * <p>3. SegmentCacheWorker methods and segmentCache field become
076 * non-static. initCache() is called on construction. SegmentCache is passed
077 * into constructor (therefore move ServiceDiscovery into
078 * client). AggregationManager (or maybe MondrianServer) is another constructor
079 * parameter.</p>
080 *
081 * <p>5. Move SegmentHeader, SegmentBody, ConstrainedColumn into
082 * mondrian.spi. Leave behind dependencies on mondrian.rolap.agg. In particular,
083 * put code that converts Segment + SegmentWithData to and from SegmentHeader
084 * + SegmentBody (e.g. {@link SegmentHeader}#forSegment) into a utility class.
085 * (Do this as CLEANUP, after functionality is complete?)</p>
086 *
087 * <p>6. Move functionality Aggregation to Segment. Long-term, Aggregation
088 * should not be used as a 'gatekeeper' to Segment. Remove Aggregation fields
089 * columns and axes.</p>
090 *
091 * <p>9. Obsolete {@link RolapStar#cacheAggregations}. Similar effect will be
092 * achieved by removing the 'jvm cache' from the chain of caches.</p>
093 *
094 * <p>10. Rename Aggregation.Axis to SegmentAxis.</p>
095 *
096 * <p>11. Remove Segment.setData and instead split out subclass
097 * SegmentWithData. Now segment is immutable. You don't have to wait for its
098 * state to change. You wait for a Future&lt;SegmentWithData&gt; to become
099 * ready.</p>
100 *
101 * <p>12. Remove methods: RolapCube.checkAggregateModifications,
102 * RolapStar.checkAggregateModifications,
103 * RolapSchema.checkAggregateModifications,
104 * RolapStar.pushAggregateModificationsToGlobalCache,
105 * RolapSchema.pushAggregateModificationsToGlobalCache,
106 * RolapCube.pushAggregateModificationsToGlobalCache.</p>
107 *
108 * <p>13. Add new implementations of Future: CompletedFuture and SlotFuture.</p>
109 *
110 * <p>14. Remove methods:<p>
111 * <ul>
112 *
113 * <li>Remove {@link SegmentLoader}.loadSegmentsFromCache - creates a
114 *   {@link SegmentHeader} that has PRECISELY same specification as the
115 *   requested segment, very unlikely to have a hit</li>
116 *
117 * <li>Remove {@link SegmentLoader}.loadSegmentFromCacheRollup</li>
118 *
119 * <li>Break up {@link SegmentLoader}.cacheSegmentData, and
120 *   place code that is called after a segment has arrived</li>
121 *
122 * </ul>
123 *
124 * <p>13. Fix flush. Obsolete {@link Aggregation}.flush, and
125 * {@link RolapStar}.flush, which called it.</p>
126 *
127 * <p>18. {@code SegmentCacheManager#locateHeaderBody} (and maybe other
128 * methods) call {@link SegmentCacheWorker#get}, and that's a slow blocking
129 * call. Make waits for segment futures should be called from a worker or
130 * client, not an agent.</p>
131 *
132 *
133 * <h2>Ideas and tasks</h2>
134 *
135 * <p>7. RolapStar.localAggregations and .sharedAggregations. Obsolete
136 * sharedAggregations.</p>
137 *
138 * <p>8. Longer term. Move {@link mondrian.rolap.RolapStar.Bar}.segmentRefs to
139 * {@link mondrian.server.Execution}. Would it still be thread-local?</p>
140 *
141 * <p>10. Call
142 * {@link mondrian.spi.DataSourceChangeListener#isAggregationChanged}.
143 * Previously called from
144 * {@link RolapStar}.checkAggregateModifications, now never called.</p>
145 *
146 * <p>12. We can quickly identify segments affected by a flush using
147 * {@link SegmentCacheIndex#intersectRegion}. But then what? Options:</p>
148 *
149 * <ol>
150 *
151 * <li>Option #1. Pull them in, trim them, write them out? But: causes
152 *     a lot of I/O, and we may never use these
153 *     segments. Easiest.</li>
154 *
155 * <li>Option #2. Mark the segments in the index as needing to be trimmed; trim
156 *     them when read, and write out again. But: doesn't propagate to other
157 *     nodes.</li>
158 *
159 * <li>Option #3. (Best?) Write a mapping SegmentHeader->Restrictions into the
160 *     cache.  Less I/O than #1. Method
161 *     "SegmentCache.addRestriction(SegmentHeader, CacheRegion)"</li>
162 *
163 * </ol>
164 *
165 * <p>14. Move {@link AggregationManager#getCellFromCache} somewhere else.
166 *   It's concerned with local segments, not the global/external cache.</p>
167 *
168 * <p>15. Method to convert SegmentHeader + SegmentBody to Segment +
169 * SegmentWithData is imperfect. Cannot parse predicates, compound predicates.
170 * Need mapping in star to do it properly and efficiently?
171 * {@link mondrian.rolap.agg.SegmentBuilder.SegmentConverter} is a hack that
172 * can be removed when this is fixed.
173 * See {@link SegmentBuilder#toSegment}. Also see #20.</p>
174 *
175 * <p>17. Revisit the strategy for finding segments that can be copied from
176 * global and external cache into local cache. The strategy of sending N
177 * {@link CellRequest}s at a time, then executing SQL to fill in the gaps, is
178 * flawed. We need to maximize N in order to reduce segment fragmentation, but
179 * if too high, we blow memory. BasicQueryTest.testAnalysis is an example of
180 * this. Instead, we should send cell-requests in batches (is ~1000 the right
181 * size?), identify those that can be answered from global or external cache,
182 * return those segments, but not execute SQL until the end of the phase.
183 * If so, {@link CellRequestQuantumExceededException} be obsoleted.</p>
184 *
185 * <p>19. Tracing.
186 * a. Remove or re-purpose {@link FastBatchingCellReader#pendingCount};
187 * b. Add counter to measure requests satisfied by calling
188 * {@link mondrian.rolap.agg.SegmentCacheManager#peek}.</p>
189 *
190 * <p>20. Obsolete {@link SegmentDataset} and its implementing classes.
191 * {@link SegmentWithData} can use {@link SegmentBody} instead. Will save
192 * copying.</p>
193 *
194 * <p>21. Obsolete {@link mondrian.util.CombiningGenerator}.</p>
195 *
196 * <p>22. {@link SegmentHeader#constrain(mondrian.spi.SegmentColumn[])} is
197 * broken for N-dimensional regions where N &gt; 1. Each call currently
198 * creates N more 1-dimensional regions, but should create 1 more N-dimensional
199 * region. {@link SegmentHeader#excludedRegions} should be a list of
200 * {@link SegmentColumn} arrays.</p>
201 *
202 * <p>23. All code that calls {@link Future#get} should probably handle
203 * {@link CancellationException}.</p>
204 *
205 * <p>24. Obsolete {@link #handler}. Indirection doesn't win anything.</p>
206 *
207 *
208 * @author jhyde
209 */
210public class SegmentCacheManager {
211    private final Handler handler = new Handler();
212    private final Actor ACTOR;
213    public final Thread thread;
214
215    /**
216     * Executor with which to send requests to external caches.
217     */
218    public final ExecutorService cacheExecutor =
219        Util.getExecutorService(
220            MondrianProperties.instance()
221                .SegmentCacheManagerNumberCacheThreads.get(),
222            0, 1,
223            "mondrian.rolap.agg.SegmentCacheManager$cacheExecutor",
224            new RejectedExecutionHandler() {
225                public void rejectedExecution(
226                    Runnable r,
227                    ThreadPoolExecutor executor)
228                {
229                    throw MondrianResource.instance()
230                        .SegmentCacheLimitReached.ex();
231                }
232            });
233
234    /**
235     * Executor with which to execute SQL requests.
236     *
237     * <p>TODO: create using factory and/or configuration parameters. Executor
238     * should be shared within MondrianServer or target JDBC database.
239     */
240    public final ExecutorService sqlExecutor =
241        Util.getExecutorService(
242            MondrianProperties.instance()
243                .SegmentCacheManagerNumberSqlThreads.get(),
244            0, 1,
245            "mondrian.rolap.agg.SegmentCacheManager$sqlExecutor",
246            new RejectedExecutionHandler() {
247                public void rejectedExecution(
248                    Runnable r,
249                    ThreadPoolExecutor executor)
250                {
251                    throw MondrianResource.instance()
252                        .SqlQueryLimitReached.ex();
253                }
254            });
255
256    // NOTE: This list is only mutable for testing purposes. Would rather it
257    // were immutable.
258    public final List<SegmentCacheWorker> segmentCacheWorkers =
259        new CopyOnWriteArrayList<SegmentCacheWorker>();
260
261    public final SegmentCache compositeCache;
262    private final SegmentCacheIndexRegistry indexRegistry;
263
264    private static final Logger LOGGER =
265        Logger.getLogger(AggregationManager.class);
266    private final MondrianServer server;
267
268    public SegmentCacheManager(MondrianServer server) {
269        this.server = server;
270        ACTOR = new Actor();
271        thread = new Thread(
272            ACTOR, "mondrian.rolap.agg.SegmentCacheManager$ACTOR");
273        thread.setDaemon(true);
274        thread.start();
275
276        // Create the index registry.
277        this.indexRegistry = new SegmentCacheIndexRegistry();
278
279        // Add a local cache, if needed.
280        if (!MondrianProperties.instance().DisableLocalSegmentCache.get()
281            && !MondrianProperties.instance().DisableCaching.get())
282        {
283            final MemorySegmentCache cache = new MemorySegmentCache();
284            segmentCacheWorkers.add(
285                new SegmentCacheWorker(cache, thread));
286        }
287
288        // Add an external cache, if configured.
289        final List<SegmentCache> externalCache = SegmentCacheWorker.initCache();
290        for (SegmentCache cache : externalCache) {
291            // Create a worker for this external cache
292            segmentCacheWorkers.add(
293                new SegmentCacheWorker(cache, thread));
294            // Hook up a listener so it can update
295            // the segment index.
296            cache.addListener(
297                new AsyncCacheListener(this, server));
298        }
299
300        compositeCache = new CompositeSegmentCache(segmentCacheWorkers);
301    }
302
303    public <T> T execute(Command<T> command) {
304        return ACTOR.execute(handler, command);
305    }
306
307    public SegmentCacheIndexRegistry getIndexRegistry() {
308        return indexRegistry;
309    }
310
311    /**
312     * Adds a segment to segment index.
313     *
314     * <p>Called when a SQL statement has finished loading a segment.</p>
315     *
316     * <p>Does not add the segment to the external cache. That is a potentially
317     * long-duration operation, better carried out by a worker.</p>
318     *
319     * @param header segment header
320     * @param body segment body
321     */
322    public void loadSucceeded(
323        RolapStar star,
324        SegmentHeader header,
325        SegmentBody body)
326    {
327        final Locus locus = Locus.peek();
328        ACTOR.event(
329            handler,
330            new SegmentLoadSucceededEvent(
331                System.currentTimeMillis(),
332                locus.getServer().getMonitor(),
333                locus.getServer().getId(),
334                locus.execution.getMondrianStatement()
335                    .getMondrianConnection().getId(),
336                locus.execution.getMondrianStatement().getId(),
337                locus.execution.getId(),
338                star,
339                header,
340                body));
341    }
342
343    /**
344     * Informs cache manager that a segment load failed.
345     *
346     * <p>Called when a SQL statement receives an error while loading a
347     * segment.</p>
348     *
349     * @param header segment header
350     * @param throwable Error
351     */
352    public void loadFailed(
353        RolapStar star,
354        SegmentHeader header,
355        Throwable throwable)
356    {
357        final Locus locus = Locus.peek();
358        ACTOR.event(
359            handler,
360            new SegmentLoadFailedEvent(
361                System.currentTimeMillis(),
362                locus.getServer().getMonitor(),
363                locus.getServer().getId(),
364                locus.execution.getMondrianStatement()
365                    .getMondrianConnection().getId(),
366                locus.execution.getMondrianStatement().getId(),
367                locus.execution.getId(),
368                star,
369                header,
370                throwable));
371    }
372
373    /**
374     * Removes a segment from segment index.
375     *
376     * <p>Call is asynchronous. It comes back immediately.</p>
377     *
378     * <p>Does not remove it from the external cache.</p>
379     *
380     * @param header segment header
381     */
382    public void remove(
383        RolapStar star,
384        SegmentHeader header)
385    {
386        final Locus locus = Locus.peek();
387        ACTOR.event(
388            handler,
389            new SegmentRemoveEvent(
390                System.currentTimeMillis(),
391                locus.getServer().getMonitor(),
392                locus.getServer().getId(),
393                locus.execution.getMondrianStatement()
394                    .getMondrianConnection().getId(),
395                locus.execution.getMondrianStatement().getId(),
396                locus.execution.getId(),
397                this,
398                star,
399                header));
400    }
401
402    /**
403     * Tells the cache that a segment is newly available in an external cache.
404     */
405    public void externalSegmentCreated(
406        SegmentHeader header,
407        MondrianServer server)
408    {
409        if (MondrianProperties.instance().DisableCaching.get()) {
410            // Ignore cache requests.
411            return;
412        }
413        ACTOR.event(
414            handler,
415            new ExternalSegmentCreatedEvent(
416                System.currentTimeMillis(),
417                server.getMonitor(),
418                server.getId(),
419                0,
420                0,
421                0,
422                this,
423                header));
424    }
425
426    /**
427     * Tells the cache that a segment is no longer available in an external
428     * cache.
429     */
430    public void externalSegmentDeleted(
431        SegmentHeader header,
432        MondrianServer server)
433    {
434        if (MondrianProperties.instance().DisableCaching.get()) {
435            // Ignore cache requests.
436            return;
437        }
438        ACTOR.event(
439            handler,
440            new ExternalSegmentDeletedEvent(
441                System.currentTimeMillis(),
442                server.getMonitor(),
443                server.getId(),
444                0,
445                0,
446                0,
447                this,
448                header));
449    }
450
451    public void printCacheState(
452        CellRegion region,
453        PrintWriter pw,
454        Locus locus)
455    {
456        ACTOR.execute(
457            handler,
458            new PrintCacheStateCommand(region, pw, locus));
459    }
460
461    /**
462     * Shuts down this cache manager and all active threads and indexes.
463     */
464    public void shutdown() {
465        execute(new ShutdownCommand());
466        cacheExecutor.shutdown();
467        sqlExecutor.shutdown();
468    }
469
470    public SegmentBuilder.SegmentConverter getConverter(
471        RolapStar star,
472        SegmentHeader header)
473    {
474        return indexRegistry.getIndex(star)
475                .getConverter(
476                    header.schemaName,
477                    header.schemaChecksum,
478                    header.cubeName,
479                    header.rolapStarFactTableName,
480                    header.measureName,
481                    header.compoundPredicates);
482    }
483
484    /**
485     * Makes a quick request to the aggregation manager to see whether the
486     * cell value required by a particular cell request is in external cache.
487     *
488     * <p>'Quick' is relative. It is an asynchronous request (due to
489     * the aggregation manager being an actor) and therefore somewhat slow. If
490     * the segment is in cache, will save batching up future requests and
491     * re-executing the query. Win should be particularly noticeable for queries
492     * running on a populated cache. Without this feature, every query would
493     * require at least two iterations.</p>
494     *
495     * <p>Request does not issue SQL to populate the segment. Nor does it
496     * try to find existing segments for rollup. Those operations can wait until
497     * next phase.</p>
498     *
499     * <p>Client is responsible for adding the segment to its private cache.</p>
500     *
501     * @param request Cell request
502     * @return Segment with data, or null if not in cache
503     */
504    public SegmentWithData peek(final CellRequest request) {
505        final SegmentCacheManager.PeekResponse response =
506            execute(
507                new PeekCommand(request, Locus.peek()));
508        for (SegmentHeader header : response.headerMap.keySet()) {
509            final SegmentBody body = compositeCache.get(header);
510            if (body != null) {
511                final SegmentBuilder.SegmentConverter converter =
512                    response.converterMap.get(
513                        SegmentCacheIndexImpl.makeConverterKey(header));
514                if (converter != null) {
515                    return converter.convert(header, body);
516                }
517            }
518        }
519        for (Map.Entry<SegmentHeader, Future<SegmentBody>> entry
520            : response.headerMap.entrySet())
521        {
522            final Future<SegmentBody> bodyFuture = entry.getValue();
523            if (bodyFuture != null) {
524                final SegmentBody body =
525                    Util.safeGet(
526                        bodyFuture,
527                        "Waiting for segment to load");
528                final SegmentHeader header = entry.getKey();
529                final SegmentBuilder.SegmentConverter converter =
530                    response.converterMap.get(
531                        SegmentCacheIndexImpl.makeConverterKey(header));
532                if (converter != null) {
533                    return converter.convert(header, body);
534                }
535            }
536        }
537        return null;
538    }
539
540    /**
541     * Visitor for messages (commands and events).
542     */
543    public interface Visitor {
544        void visit(SegmentLoadSucceededEvent event);
545        void visit(SegmentLoadFailedEvent event);
546        void visit(SegmentRemoveEvent event);
547        void visit(ExternalSegmentCreatedEvent event);
548        void visit(ExternalSegmentDeletedEvent event);
549    }
550
551    private class Handler implements Visitor {
552        public void visit(SegmentLoadSucceededEvent event) {
553            indexRegistry.getIndex(event.star)
554                .loadSucceeded(
555                    event.header,
556                    event.body);
557
558            event.monitor.sendEvent(
559                new CellCacheSegmentCreateEvent(
560                    event.timestamp,
561                    event.serverId,
562                    event.connectionId,
563                    event.statementId,
564                    event.executionId,
565                    event.header.getConstrainedColumns().size(),
566                    event.body == null
567                        ? 0
568                        : event.body.getValueMap().size(),
569                    CellCacheSegmentCreateEvent.Source.SQL));
570        }
571
572        public void visit(SegmentLoadFailedEvent event) {
573            indexRegistry.getIndex(event.star)
574                .loadFailed(
575                    event.header,
576                    event.throwable);
577        }
578
579        public void visit(final SegmentRemoveEvent event) {
580            indexRegistry.getIndex(event.star)
581                .remove(event.header);
582
583            event.monitor.sendEvent(
584                new CellCacheSegmentDeleteEvent(
585                    event.timestamp,
586                    event.serverId,
587                    event.connectionId,
588                    event.statementId,
589                    event.executionId,
590                    event.header.getConstrainedColumns().size(),
591                    CellCacheEvent.Source.CACHE_CONTROL));
592
593            // Remove the segment from external caches. Use an executor, because
594            // it may take some time. We discard the future, because we don't
595            // care too much if it fails.
596            final Future<?> future = event.cacheMgr.cacheExecutor.submit(
597                new Runnable() {
598                    public void run() {
599                        try {
600                            // Note that the SegmentCache API doesn't require
601                            // us to verify that the segment exists (by calling
602                            // "contains") before we call "remove".
603                            event.cacheMgr.compositeCache.remove(event.header);
604                        } catch (Throwable e) {
605                            LOGGER.warn(
606                                "remove header failed: " + event.header,
607                                e);
608                        }
609                    }
610                }
611            );
612            Util.safeGet(future, "SegmentCacheManager.segmentremoved");
613        }
614
615        public void visit(ExternalSegmentCreatedEvent event) {
616            final SegmentCacheIndex index =
617                event.cacheMgr.indexRegistry.getIndex(event.header);
618            if (index != null) {
619                index.add(event.header, false, null);
620                event.monitor.sendEvent(
621                    new CellCacheSegmentCreateEvent(
622                        event.timestamp,
623                        event.serverId,
624                        event.connectionId,
625                        event.statementId,
626                        event.executionId,
627                        event.header.getConstrainedColumns().size(),
628                        0,
629                        CellCacheEvent.Source.EXTERNAL));
630            }
631        }
632
633        public void visit(ExternalSegmentDeletedEvent event) {
634            final SegmentCacheIndex index =
635                event.cacheMgr.indexRegistry.getIndex(event.header);
636            if (index != null) {
637                index.remove(event.header);
638                event.monitor.sendEvent(
639                    new CellCacheSegmentDeleteEvent(
640                        event.timestamp,
641                        event.serverId,
642                        event.connectionId,
643                        event.statementId,
644                        event.executionId,
645                        event.header.getConstrainedColumns().size(),
646                        CellCacheEvent.Source.EXTERNAL));
647            }
648        }
649    }
650
651    interface Message {
652    }
653
654    public static interface Command<T> extends Message, Callable<T> {
655        Locus getLocus();
656    }
657
658    /**
659     * Command to flush a particular region from cache.
660     */
661    public static final class FlushCommand implements Command<FlushResult> {
662        private final CellRegion region;
663        private final CacheControlImpl cacheControlImpl;
664        private final Locus locus;
665        private final SegmentCacheManager cacheMgr;
666
667        public FlushCommand(
668            Locus locus,
669            SegmentCacheManager mgr,
670            CellRegion region,
671            CacheControlImpl cacheControlImpl)
672        {
673            this.locus = locus;
674            this.cacheMgr = mgr;
675            this.region = region;
676            this.cacheControlImpl = cacheControlImpl;
677        }
678
679        public Locus getLocus() {
680            return locus;
681        }
682
683        public FlushResult call() throws Exception {
684            // For each measure and each star, ask the index
685            // which headers intersect.
686            final List<SegmentHeader> headers =
687                new ArrayList<SegmentHeader>();
688            final List<Member> measures =
689                CacheControlImpl.findMeasures(region);
690            final SegmentColumn[] flushRegion =
691                CacheControlImpl.findAxisValues(region);
692            final List<RolapStar> starList =
693                CacheControlImpl.getStarList(region);
694
695            for (Member member : measures) {
696                if (!(member instanceof RolapStoredMeasure)) {
697                    continue;
698                }
699                final RolapStoredMeasure storedMeasure =
700                    (RolapStoredMeasure) member;
701                final RolapStar star = storedMeasure.getCube().getStar();
702                final SegmentCacheIndex index =
703                    cacheMgr.indexRegistry.getIndex(star);
704                headers.addAll(
705                    index.intersectRegion(
706                        member.getDimension().getSchema().getName(),
707                        ((RolapSchema) member.getDimension().getSchema())
708                            .getChecksum(),
709                        storedMeasure.getCube().getName(),
710                        storedMeasure.getName(),
711                        storedMeasure.getCube().getStar()
712                            .getFactTable().getAlias(),
713                        flushRegion));
714                if (cacheControlImpl.isTraceEnabled()) {
715                    Collections.sort(
716                        headers,
717                        new Comparator<SegmentHeader>() {
718                            public int compare(
719                                SegmentHeader o1,
720                                SegmentHeader o2)
721                            {
722                                return o1.getUniqueID()
723                                    .compareTo(o2.getUniqueID());
724                            }
725                        });
726                }
727            }
728
729            // If flushRegion is empty, this means we must clear all
730            // segments for the region's measures.
731            if (flushRegion.length == 0) {
732                for (final SegmentHeader header : headers) {
733                    for (RolapStar star : starList) {
734                        cacheMgr.indexRegistry.getIndex(star).remove(header);
735                    }
736                    // Remove the segment from external caches. Use an
737                    // executor, because it may take some time. We discard
738                    // the future, because we don't care too much if it fails.
739                    cacheControlImpl.trace(
740                        "discard segment - it cannot be constrained and maintain consistency:\n"
741                        + header.getDescription());
742
743                    final Future<?> task = cacheMgr.cacheExecutor.submit(
744                        new Runnable() {
745                            public void run() {
746                                try {
747                                    // Note that the SegmentCache API doesn't
748                                    // require us to verify that the segment
749                                    // exists (by calling "contains") before we
750                                    // call "remove".
751                                    cacheMgr.compositeCache.remove(header);
752                                } catch (Throwable e) {
753                                    LOGGER.warn(
754                                        "remove header failed: " + header,
755                                        e);
756                                }
757                            }
758                        });
759                    Util.safeGet(task, "SegmentCacheManager.flush");
760                }
761                return new FlushResult(
762                    Collections.<Callable<Boolean>>emptyList());
763            }
764
765            // Now we know which headers intersect. For each of them,
766            // we append an excluded region.
767            //
768            // TODO: Optimize the logic here. If a segment is mostly
769            // empty, we should trash it completely.
770            final List<Callable<Boolean>> callableList =
771                new ArrayList<Callable<Boolean>>();
772            for (final SegmentHeader header : headers) {
773                if (!header.canConstrain(flushRegion)) {
774                    // We have to delete that segment altogether.
775                    cacheControlImpl.trace(
776                        "discard segment - it cannot be constrained and maintain consistency:\n"
777                        + header.getDescription());
778                    for (RolapStar star : starList) {
779                        cacheMgr.indexRegistry.getIndex(star).remove(header);
780                    }
781                    continue;
782                }
783                final SegmentHeader newHeader =
784                    header.constrain(flushRegion);
785                for (final SegmentCacheWorker worker
786                    : cacheMgr.segmentCacheWorkers)
787                {
788                    callableList.add(
789                        new Callable<Boolean>() {
790                            public Boolean call() throws Exception {
791                                boolean existed;
792                                if (worker.supportsRichIndex()) {
793                                    final SegmentBody sb = worker.get(header);
794                                    existed = worker.remove(header);
795                                    if (sb != null) {
796                                        worker.put(newHeader, sb);
797                                    }
798                                } else {
799                                    // The cache doesn't support rich index. We
800                                    // have to clear the segment entirely.
801                                    existed = worker.remove(header);
802                                }
803                                return existed;
804                            }
805                        });
806                }
807                for (RolapStar star : starList) {
808                    SegmentCacheIndex index =
809                        cacheMgr.indexRegistry.getIndex(star);
810                    index.remove(header);
811                    index.add(newHeader, false, null);
812                }
813            }
814
815            // Done
816            return new FlushResult(callableList);
817        }
818    }
819
820    private class PrintCacheStateCommand
821        implements SegmentCacheManager.Command<Void>
822    {
823        private final PrintWriter pw;
824        private final Locus locus;
825        private final CellRegion region;
826
827        public PrintCacheStateCommand(
828            CellRegion region,
829            PrintWriter pw,
830            Locus locus)
831        {
832            this.region = region;
833            this.pw = pw;
834            this.locus = locus;
835        }
836
837        public Void call() {
838            final List<RolapStar> starList =
839                CacheControlImpl.getStarList(region);
840            Collections.sort(
841                starList,
842                new Comparator<RolapStar>() {
843                    public int compare(RolapStar o1, RolapStar o2) {
844                        return o1.getFactTable().getAlias().compareTo(
845                            o2.getFactTable().getAlias());
846                    }
847                });
848            for (RolapStar star : starList) {
849                indexRegistry.getIndex(star)
850                    .printCacheState(pw);
851            }
852            return null;
853        }
854
855        public Locus getLocus() {
856            return locus;
857        }
858    }
859
860    /**
861     * Result of a {@link FlushCommand}. Contains a list of tasks that must
862     * be executed by the caller (or by an executor) to flush segments from the
863     * external cache(s).
864     */
865    public static class FlushResult {
866        public final List<Callable<Boolean>> tasks;
867
868        public FlushResult(List<Callable<Boolean>> tasks) {
869            this.tasks = tasks;
870        }
871    }
872
873    /**
874     * Special exception, thrown only by {@link ShutdownCommand}, telling
875     * the actor to shut down.
876     */
877    private static class PleaseShutdownException extends RuntimeException {
878        private PleaseShutdownException() {
879        }
880    }
881
882    private static class ShutdownCommand implements Command<String> {
883        public ShutdownCommand() {
884        }
885
886        public String call() throws Exception {
887            throw new PleaseShutdownException();
888        }
889
890        public Locus getLocus() {
891            return null;
892        }
893    }
894
895    private static abstract class Event implements Message {
896        /**
897         * Dispatches a call to the appropriate {@code visit} method on
898         * {@link mondrian.server.monitor.Visitor}.
899         *
900         * @param visitor Visitor
901         */
902        public abstract void acceptWithoutResponse(Visitor visitor);
903    }
904
905    /**
906     * Copy-pasted from {@link mondrian.server.monitor.Monitor}. Consider
907     * abstracting common code.
908     */
909    private static class Actor implements Runnable {
910
911        private final BlockingQueue<Pair<Handler, Message>> eventQueue =
912            new ArrayBlockingQueue<Pair<Handler, Message>>(1000);
913
914        private final BlockingHashMap<Command<?>, Pair<Object, Throwable>>
915            responseMap =
916            new BlockingHashMap<Command<?>, Pair<Object, Throwable>>(1000);
917
918        public void run() {
919            try {
920                for (;;) {
921                    final Pair<Handler, Message> entry = eventQueue.take();
922                    final Handler handler = entry.left;
923                    final Message message = entry.right;
924                    try {
925                        // A message is either a command or an event.
926                        // A command returns a value that must be read by
927                        // the caller.
928                        if (message instanceof Command<?>) {
929                            Command<?> command = (Command<?>) message;
930                            try {
931                                Locus.push(command.getLocus());
932                                Object result = command.call();
933                                responseMap.put(
934                                    command,
935                                    Pair.of(result, (Throwable) null));
936                            } catch (AbortException e) {
937                                responseMap.put(
938                                    command,
939                                    Pair.of(null, (Throwable) e));
940                            } catch (PleaseShutdownException e) {
941                                responseMap.put(
942                                    command,
943                                    Pair.of(null, (Throwable) null));
944                                return; // exit event loop
945                            } catch (Throwable e) {
946                                responseMap.put(
947                                    command,
948                                    Pair.of(null, e));
949                            } finally {
950                                Locus.pop(command.getLocus());
951                            }
952                        } else {
953                            Event event = (Event) message;
954                            event.acceptWithoutResponse(handler);
955
956                            // Broadcast the event to anyone who is interested.
957                            RolapUtil.MONITOR_LOGGER.debug(message);
958                        }
959                    } catch (Throwable e) {
960                        // REVIEW: Somewhere better to send it?
961                        e.printStackTrace();
962                    }
963                }
964            } catch (InterruptedException e) {
965                // REVIEW: Somewhere better to send it?
966                e.printStackTrace();
967            } catch (Throwable e) {
968                e.printStackTrace();
969            }
970        }
971
972        <T> T execute(Handler handler, Command<T> command) {
973            try {
974                eventQueue.put(Pair.<Handler, Message>of(handler, command));
975            } catch (InterruptedException e) {
976                throw Util.newError(e, "Exception while executing " + command);
977            }
978            try {
979                final Pair<Object, Throwable> pair =
980                    responseMap.get(command);
981                if (pair.right != null) {
982                    if (pair.right instanceof RuntimeException) {
983                        throw (RuntimeException) pair.right;
984                    } else if (pair.right instanceof Error) {
985                        throw (Error) pair.right;
986                    } else {
987                        throw new RuntimeException(pair.right);
988                    }
989                } else {
990                    return (T) pair.left;
991                }
992            } catch (InterruptedException e) {
993                throw Util.newError(e, "Exception while executing " + command);
994            }
995        }
996
997        public void event(Handler handler, Event event) {
998            try {
999                eventQueue.put(Pair.<Handler, Message>of(handler, event));
1000            } catch (InterruptedException e) {
1001                throw Util.newError(e, "Exception while executing " + event);
1002            }
1003        }
1004    }
1005
1006    private static class SegmentLoadSucceededEvent extends Event {
1007        private final SegmentHeader header;
1008        private final SegmentBody body;
1009        private final long timestamp;
1010        private final RolapStar star;
1011        private final int serverId;
1012        private final int connectionId;
1013        private final long statementId;
1014        private final long executionId;
1015        private final Monitor monitor;
1016
1017        public SegmentLoadSucceededEvent(
1018            long timestamp,
1019            Monitor monitor,
1020            int serverId,
1021            int connectionId,
1022            long statementId,
1023            long executionId,
1024            RolapStar star,
1025            SegmentHeader header,
1026            SegmentBody body)
1027        {
1028            this.timestamp = timestamp;
1029            this.monitor = monitor;
1030            this.serverId = serverId;
1031            this.connectionId = connectionId;
1032            this.statementId = statementId;
1033            this.executionId = executionId;
1034            assert header != null;
1035            assert star != null;
1036            this.star = star;
1037            this.header = header;
1038            this.body = body; // may be null
1039        }
1040
1041        public void acceptWithoutResponse(Visitor visitor) {
1042            visitor.visit(this);
1043        }
1044    }
1045
1046    private static class SegmentLoadFailedEvent extends Event {
1047        private final SegmentHeader header;
1048        private final Throwable throwable;
1049        private final long timestamp;
1050        private final RolapStar star;
1051        private final Monitor monitor;
1052        private final int serverId;
1053        private final int connectionId;
1054        private final long statementId;
1055        private final long executionId;
1056
1057        public SegmentLoadFailedEvent(
1058            long timestamp,
1059            Monitor monitor,
1060            int serverId,
1061            int connectionId,
1062            long statementId,
1063            long executionId,
1064            RolapStar star,
1065            SegmentHeader header,
1066            Throwable throwable)
1067        {
1068            this.timestamp = timestamp;
1069            this.monitor = monitor;
1070            this.serverId = serverId;
1071            this.connectionId = connectionId;
1072            this.statementId = statementId;
1073            this.executionId = executionId;
1074            this.star = star;
1075            this.throwable = throwable;
1076            assert header != null;
1077            this.header = header;
1078        }
1079
1080        public void acceptWithoutResponse(Visitor visitor) {
1081            visitor.visit(this);
1082        }
1083    }
1084
1085    private static class SegmentRemoveEvent extends Event {
1086        private final SegmentHeader header;
1087        private final long timestamp;
1088        private final Monitor monitor;
1089        private final int serverId;
1090        private final int connectionId;
1091        private final long statementId;
1092        private final long executionId;
1093        private final RolapStar star;
1094        private final SegmentCacheManager cacheMgr;
1095
1096        public SegmentRemoveEvent(
1097            long timestamp,
1098            Monitor monitor,
1099            int serverId,
1100            int connectionId,
1101            long statementId,
1102            long executionId,
1103            SegmentCacheManager cacheMgr,
1104            RolapStar star,
1105            SegmentHeader header)
1106        {
1107            this.timestamp = timestamp;
1108            this.monitor = monitor;
1109            this.serverId = serverId;
1110            this.connectionId = connectionId;
1111            this.statementId = statementId;
1112            this.executionId = executionId;
1113            this.cacheMgr = cacheMgr;
1114            this.star = star;
1115            assert header != null;
1116            this.header = header;
1117        }
1118
1119        public void acceptWithoutResponse(Visitor visitor) {
1120            visitor.visit(this);
1121        }
1122    }
1123
1124    private static class ExternalSegmentCreatedEvent extends Event {
1125        private final SegmentCacheManager cacheMgr;
1126        private final SegmentHeader header;
1127        private final long timestamp;
1128        private final Monitor monitor;
1129        private final int serverId;
1130        private final int connectionId;
1131        private final long statementId;
1132        private final long executionId;
1133
1134        public ExternalSegmentCreatedEvent(
1135            long timestamp,
1136            Monitor monitor,
1137            int serverId,
1138            int connectionId,
1139            long statementId,
1140            long executionId,
1141            SegmentCacheManager cacheMgr,
1142            SegmentHeader header)
1143        {
1144            this.timestamp = timestamp;
1145            this.monitor = monitor;
1146            this.serverId = serverId;
1147            this.connectionId = connectionId;
1148            this.statementId = statementId;
1149            this.executionId = executionId;
1150            assert header != null;
1151            assert cacheMgr != null;
1152            this.cacheMgr = cacheMgr;
1153            this.header = header;
1154        }
1155
1156        public void acceptWithoutResponse(Visitor visitor) {
1157            visitor.visit(this);
1158        }
1159    }
1160
1161    private static class ExternalSegmentDeletedEvent extends Event {
1162        private final SegmentCacheManager cacheMgr;
1163        private final SegmentHeader header;
1164        private final long timestamp;
1165        private final Monitor monitor;
1166        private final int serverId;
1167        private final int connectionId;
1168        private final long statementId;
1169        private final long executionId;
1170
1171        public ExternalSegmentDeletedEvent(
1172            long timestamp,
1173            Monitor monitor,
1174            int serverId,
1175            int connectionId,
1176            long statementId,
1177            long executionId,
1178            SegmentCacheManager cacheMgr,
1179            SegmentHeader header)
1180        {
1181            this.timestamp = timestamp;
1182            this.monitor = monitor;
1183            this.serverId = serverId;
1184            this.connectionId = connectionId;
1185            this.statementId = statementId;
1186            this.executionId = executionId;
1187            assert header != null;
1188            assert cacheMgr != null;
1189            this.cacheMgr = cacheMgr;
1190            this.header = header;
1191        }
1192
1193        public void acceptWithoutResponse(Visitor visitor) {
1194            visitor.visit(this);
1195        }
1196    }
1197
1198    /**
1199     * Implementation of SegmentCacheListener that updates the
1200     * segment index of its aggregation manager instance when it receives
1201     * events from its assigned SegmentCache implementation.
1202     */
1203    private static class AsyncCacheListener
1204        implements SegmentCache.SegmentCacheListener
1205    {
1206        private final SegmentCacheManager cacheMgr;
1207        private final MondrianServer server;
1208
1209        public AsyncCacheListener(
1210            SegmentCacheManager cacheMgr,
1211            MondrianServer server)
1212        {
1213            this.cacheMgr = cacheMgr;
1214            this.server = server;
1215        }
1216
1217        public void handle(final SegmentCacheEvent e) {
1218            if (e.isLocal()) {
1219                return;
1220            }
1221            Locus.execute(
1222                Execution.NONE,
1223                "AsyncCacheListener.handle",
1224                new Locus.Action<Void>() {
1225                    public Void execute() {
1226                        final SegmentCacheManager.Command<Void> command;
1227                        final Locus locus = Locus.peek();
1228                        switch (e.getEventType()) {
1229                        case ENTRY_CREATED:
1230                            command =
1231                                new Command<Void>() {
1232                                    public Void call() {
1233                                        cacheMgr.externalSegmentCreated(
1234                                            e.getSource(),
1235                                            server);
1236                                        return null;
1237                                    }
1238                                    public Locus getLocus() {
1239                                        return locus;
1240                                    }
1241                                };
1242                            break;
1243                        case ENTRY_DELETED:
1244                            command =
1245                                new Command<Void>() {
1246                                    public Void call() {
1247                                        cacheMgr.externalSegmentDeleted(
1248                                            e.getSource(),
1249                                            server);
1250                                        return null;
1251                                    }
1252                                    public Locus getLocus() {
1253                                        return locus;
1254                                    }
1255                                };
1256                            break;
1257                        default:
1258                            throw new UnsupportedOperationException();
1259                        }
1260                        cacheMgr.execute(command);
1261                        return null;
1262                    }
1263                });
1264        }
1265    }
1266
1267    /**
1268     * Makes a collection of {@link SegmentCacheWorker} objects (each of which
1269     * is backed by a {@link SegmentCache} appear to be a SegmentCache.
1270     *
1271     * <p>For most operations, it is easier to operate on a single cache.
1272     * It is usually clear whether operations should quit when finding the first
1273     * match, or to operate on all workers. (For example, {@link #remove} tries
1274     * to remove the segment header from all workers, and returns whether it
1275     * was removed from any of them.) This class just does what seems
1276     * most typical. If you want another behavior for a particular operation,
1277     * operate on the workers directly.</p>
1278     */
1279    static class CompositeSegmentCache implements SegmentCache {
1280        final List<SegmentCacheWorker> workers;
1281
1282        public CompositeSegmentCache(List<SegmentCacheWorker> workers) {
1283            this.workers = workers;
1284        }
1285
1286        public SegmentBody get(SegmentHeader header) {
1287            for (SegmentCacheWorker worker : workers) {
1288                final SegmentBody body = worker.get(header);
1289                if (body != null) {
1290                    return body;
1291                }
1292            }
1293            return null;
1294        }
1295
1296        public List<SegmentHeader> getSegmentHeaders() {
1297            if (MondrianProperties.instance().DisableCaching.get()) {
1298                return Collections.emptyList();
1299            }
1300            // Special case 0 and 1 workers, for which the 'union' operation
1301            // is trivial.
1302            switch (workers.size()) {
1303            case 0:
1304                return Collections.emptyList();
1305            case 1:
1306                return workers.get(0).getSegmentHeaders();
1307            default:
1308                final List<SegmentHeader> list = new ArrayList<SegmentHeader>();
1309                final Set<SegmentHeader> set = new HashSet<SegmentHeader>();
1310                for (SegmentCacheWorker worker : workers) {
1311                    for (SegmentHeader header : worker.getSegmentHeaders()) {
1312                        if (set.add(header)) {
1313                            list.add(header);
1314                        }
1315                    }
1316                }
1317                return list;
1318            }
1319        }
1320
1321        public boolean put(SegmentHeader header, SegmentBody body) {
1322            if (MondrianProperties.instance().DisableCaching.get()) {
1323                return true;
1324            }
1325            for (SegmentCacheWorker worker : workers) {
1326                worker.put(header, body);
1327            }
1328            return true;
1329        }
1330
1331        public boolean remove(SegmentHeader header) {
1332            boolean result = false;
1333            for (SegmentCacheWorker worker : workers) {
1334                if (worker.remove(header)) {
1335                    result = true;
1336                }
1337            }
1338            return result;
1339        }
1340
1341        public void tearDown() {
1342            for (SegmentCacheWorker worker : workers) {
1343                worker.shutdown();
1344            }
1345        }
1346
1347        public void addListener(SegmentCacheListener listener) {
1348            for (SegmentCacheWorker worker : workers) {
1349                worker.cache.addListener(listener);
1350            }
1351        }
1352
1353        public void removeListener(SegmentCacheListener listener) {
1354            for (SegmentCacheWorker worker : workers) {
1355                worker.cache.removeListener(listener);
1356            }
1357        }
1358
1359        public boolean supportsRichIndex() {
1360            for (SegmentCacheWorker worker : workers) {
1361                if (!worker.supportsRichIndex()) {
1362                    return false;
1363                }
1364            }
1365            return true;
1366        }
1367    }
1368
1369    /**
1370     * Locates segments in the cache that satisfy a given request.
1371     *
1372     * <p>The result consists of (a) a list of segment headers, (b) a list
1373     * of futures for segment bodies that are currently being loaded, (c)
1374     * converters to convert headers into {@link SegmentWithData}.</p>
1375     *
1376     * <p>For (a), the client should call the cache to get the body for each
1377     * segment header; it is possible that there is no body in the cache.
1378     * For (b), the client will have to wait for the segment to arrive.</p>
1379     */
1380    private class PeekCommand
1381        implements SegmentCacheManager.Command<PeekResponse>
1382    {
1383        private final CellRequest request;
1384        private final Locus locus;
1385
1386        /**
1387         * Creates a PeekCommand.
1388         *
1389         * @param request Cell request
1390         * @param locus Locus
1391         */
1392        public PeekCommand(
1393            CellRequest request,
1394            Locus locus)
1395        {
1396            this.request = request;
1397            this.locus = locus;
1398        }
1399
1400        public PeekResponse call() {
1401            final RolapStar.Measure measure = request.getMeasure();
1402            final RolapStar star = measure.getStar();
1403            final RolapSchema schema = star.getSchema();
1404            final AggregationKey key = new AggregationKey(request);
1405            final List<SegmentHeader> headers =
1406                indexRegistry.getIndex(star)
1407                    .locate(
1408                        schema.getName(),
1409                        schema.getChecksum(),
1410                        measure.getCubeName(),
1411                        measure.getName(),
1412                        star.getFactTable().getAlias(),
1413                        request.getConstrainedColumnsBitKey(),
1414                        request.getMappedCellValues(),
1415                        AggregationKey.getCompoundPredicateStringList(
1416                            star,
1417                            key.getCompoundPredicateList()));
1418
1419            final Map<SegmentHeader, Future<SegmentBody>> headerMap =
1420                new HashMap<SegmentHeader, Future<SegmentBody>>();
1421            final Map<List, SegmentBuilder.SegmentConverter> converterMap =
1422                new HashMap<List, SegmentBuilder.SegmentConverter>();
1423
1424            // Is there a pending segment? (A segment that has been created and
1425            // is loading via SQL.)
1426            for (final SegmentHeader header : headers) {
1427                final Future<SegmentBody> bodyFuture =
1428                    indexRegistry.getIndex(star)
1429                        .getFuture(locus.execution, header);
1430                if (bodyFuture != null) {
1431                    // Check if the DataSourceChangeListener wants us to clear
1432                    // the current segment
1433                    if (star.getChangeListener() != null
1434                        && star.getChangeListener().isAggregationChanged(key))
1435                    {
1436                        // We can't satisfy this request, and we must clear the
1437                        // data from our cache. This must be in sync with the
1438                        // actor thread to maintain consistency.
1439                        indexRegistry.getIndex(star).remove(header);
1440                        Util.safeGet(
1441                            cacheExecutor.submit(
1442                                new Runnable() {
1443                                    public void run() {
1444                                        try {
1445                                            compositeCache.remove(header);
1446                                        } catch (Throwable e) {
1447                                            LOGGER.warn(
1448                                                "remove header failed: "
1449                                                + header,
1450                                                e);
1451                                        }
1452                                    }
1453                                }),
1454                            "SegmentCacheManager.peek");
1455                        continue;
1456                    }
1457                    converterMap.put(
1458                        SegmentCacheIndexImpl.makeConverterKey(header),
1459                        getConverter(star, header));
1460                    headerMap.put(
1461                        header, bodyFuture);
1462                }
1463            }
1464
1465            return new PeekResponse(headerMap, converterMap);
1466        }
1467
1468        public Locus getLocus() {
1469            return locus;
1470        }
1471    }
1472
1473    private static class PeekResponse {
1474        public final Map<SegmentHeader, Future<SegmentBody>> headerMap;
1475        public final Map<List, SegmentBuilder.SegmentConverter> converterMap;
1476
1477        public PeekResponse(
1478            Map<SegmentHeader, Future<SegmentBody>> headerMap,
1479            Map<List, SegmentBuilder.SegmentConverter> converterMap)
1480        {
1481            this.headerMap = headerMap;
1482            this.converterMap = converterMap;
1483        }
1484    }
1485
1486    /**
1487     * Registry of all the indexes that were created for this
1488     * cache manager, per {@link RolapStar}.
1489     */
1490    public class SegmentCacheIndexRegistry {
1491        private final Map<RolapStar, SegmentCacheIndex> indexes =
1492            new WeakHashMap<RolapStar, SegmentCacheIndex>();
1493        /**
1494         * Returns the {@link SegmentCacheIndex} for a given
1495         * {@link RolapStar}.
1496         */
1497        public SegmentCacheIndex getIndex(RolapStar star) {
1498            if (!indexes.containsKey(star)) {
1499                indexes.put(star, new SegmentCacheIndexImpl(thread));
1500            }
1501            return indexes.get(star);
1502        }
1503        /**
1504         * Returns the {@link SegmentCacheIndex} for a given
1505         * {@link SegmentHeader}.
1506         */
1507        private SegmentCacheIndex getIndex(
1508            SegmentHeader header)
1509        {
1510            // First we check the indexes that already exist.
1511            // This is fast.
1512            for (Entry<RolapStar, SegmentCacheIndex> entry
1513                : indexes.entrySet())
1514            {
1515                final String factTableName =
1516                    entry.getKey().getFactTable().getTableName();
1517                final ByteString schemaChecksum =
1518                    entry.getKey().getSchema().getChecksum();
1519                if (!factTableName.equals(header.rolapStarFactTableName)) {
1520                    continue;
1521                }
1522                if (!schemaChecksum.equals(header.schemaChecksum)) {
1523                    continue;
1524                }
1525                return entry.getValue();
1526            }
1527            // The index doesn't exist. Let's create it.
1528            for (RolapSchema schema : RolapSchema.getRolapSchemas()) {
1529                if (!schema.getChecksum().equals(header.schemaChecksum)) {
1530                    continue;
1531                }
1532                // We have a schema match.
1533                RolapStar star =
1534                    schema.getStar(header.rolapStarFactTableName);
1535                return getIndex(star);
1536            }
1537            return null;
1538        }
1539        public void cancelExecutionSegments(Execution exec) {
1540            for (SegmentCacheIndex index : indexes.values()) {
1541                index.cancel(exec);
1542            }
1543        }
1544    }
1545
1546    /**
1547     * Exception which someone can throw to indicate to the Actor that
1548     * whatever it was doing is not needed anymore. Won't trigger any output
1549     * to the logs.
1550     *
1551     * <p>If your {@link Command} throws this, it will be sent back at you.
1552     * You must handle it.
1553     */
1554    public static final class AbortException extends RuntimeException {
1555        private static final long serialVersionUID = 1L;
1556    };
1557}
1558
1559// End SegmentCacheManager.java