001/*
002// This software is subject to the terms of the Eclipse Public License v1.0
003// Agreement, available at the following URL:
004// http://www.eclipse.org/legal/epl-v10.html.
005// You must accept the terms of that agreement to use this software.
006//
007// Copyright (C) 2007-2013 Pentaho and others
008// All Rights Reserved.
009*/
010package mondrian.rolap;
011
012import mondrian.olap.Util;
013import mondrian.olap.Util.Functor1;
014import mondrian.server.Execution;
015import mondrian.server.Locus;
016import mondrian.server.monitor.*;
017import mondrian.server.monitor.SqlStatementEvent.Purpose;
018import mondrian.spi.Dialect;
019import mondrian.util.*;
020
021import org.apache.log4j.Logger;
022
023import java.lang.reflect.InvocationTargetException;
024import java.lang.reflect.Proxy;
025import java.sql.*;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.concurrent.atomic.AtomicLong;
029import javax.sql.DataSource;
030
031/**
032 * SqlStatement contains a SQL statement and associated resources throughout
033 * its lifetime.
034 *
035 * <p>The goal of SqlStatement is to make tracing, error-handling and
036 * resource-management easier. None of the methods throws a SQLException;
037 * if an error occurs in one of the methods, the method wraps the exception
038 * in a {@link RuntimeException} describing the high-level operation, logs
039 * that the operation failed, and throws that RuntimeException.
040 *
041 * <p>If methods succeed, the method generates lifecycle logging such as
042 * the elapsed time and number of rows fetched.
043 *
044 * <p>There are a few obligations on the caller. The caller must:<ul>
045 * <li>call the {@link #handle(Throwable)} method if one of the contained
046 *     objects (say the {@link java.sql.ResultSet}) gives an error;
047 * <li>call the {@link #close()} method if all operations complete
048 *     successfully.
049 * <li>increment the {@link #rowCount} field each time a row is fetched.
050 * </ul>
051 *
052 * <p>The {@link #close()} method is idempotent. You are welcome to call it
053 * more than once.
054 *
055 * <p>SqlStatement is not thread-safe.
056 *
057 * @author jhyde
058 * @since 2.3
059 */
060public class SqlStatement {
061    private static final Logger LOG = Logger.getLogger(SqlStatement.class);
062    private static final String TIMING_NAME = "SqlStatement-";
063
064    // used for SQL logging, allows for a SQL Statement UID
065    private static final AtomicLong ID_GENERATOR = new AtomicLong();
066
067    private static final RolapUtil.Semaphore querySemaphore =
068        RolapUtil.getQuerySemaphore();
069
070    private final DataSource dataSource;
071    private Connection jdbcConnection;
072    private ResultSet resultSet;
073    private final String sql;
074    private final List<Type> types;
075    private final int maxRows;
076    private final int firstRowOrdinal;
077    private final Locus locus;
078    private final int resultSetType;
079    private final int resultSetConcurrency;
080    private boolean haveSemaphore;
081    public int rowCount;
082    private long startTimeNanos;
083    private long startTimeMillis;
084    private final List<Accessor> accessors = new ArrayList<Accessor>();
085    private State state = State.FRESH;
086    private final long id;
087    private Functor1<Void, Statement> callback;
088
089    /**
090     * Creates a SqlStatement.
091     *
092     * @param dataSource Data source
093     * @param sql SQL
094     * @param types Suggested types of columns, or null;
095     *     if present, must have one element for each SQL column;
096     *     each not-null entry overrides deduced JDBC type of the column
097     * @param maxRows Maximum rows; <= 0 means no maximum
098     * @param firstRowOrdinal Ordinal of first row to skip to; <= 0 do not skip
099     * @param locus Execution context of this statement
100     * @param resultSetType Result set type
101     * @param resultSetConcurrency Result set concurrency
102     */
103    public SqlStatement(
104        DataSource dataSource,
105        String sql,
106        List<Type> types,
107        int maxRows,
108        int firstRowOrdinal,
109        Locus locus,
110        int resultSetType,
111        int resultSetConcurrency,
112        Util.Functor1<Void, Statement> callback)
113    {
114        this.callback = callback;
115        this.id = ID_GENERATOR.getAndIncrement();
116        this.dataSource = dataSource;
117        this.sql = sql;
118        this.types = types;
119        this.maxRows = maxRows;
120        this.firstRowOrdinal = firstRowOrdinal;
121        this.locus = locus;
122        this.resultSetType = resultSetType;
123        this.resultSetConcurrency = resultSetConcurrency;
124    }
125
126    /**
127     * Executes the current statement, and handles any SQLException.
128     */
129    public void execute() {
130        assert state == State.FRESH : "cannot re-execute";
131        state = State.ACTIVE;
132        Counters.SQL_STATEMENT_EXECUTE_COUNT.incrementAndGet();
133        Counters.SQL_STATEMENT_EXECUTING_IDS.add(id);
134        String status = "failed";
135        Statement statement = null;
136        try {
137            // Check execution state
138            locus.execution.checkCancelOrTimeout();
139
140            this.jdbcConnection = dataSource.getConnection();
141            querySemaphore.enter();
142            haveSemaphore = true;
143            // Trace start of execution.
144            if (RolapUtil.SQL_LOGGER.isDebugEnabled()) {
145                StringBuilder sqllog = new StringBuilder();
146                sqllog.append(id)
147                    .append(": ")
148                    .append(locus.component)
149                    .append(": executing sql [");
150                if (sql.indexOf('\n') >= 0) {
151                    // SQL appears to be formatted as multiple lines. Make it
152                    // start on its own line.
153                    sqllog.append("\n");
154                }
155                sqllog.append(sql);
156                sqllog.append(']');
157                RolapUtil.SQL_LOGGER.debug(sqllog.toString());
158            }
159
160            // Execute hook.
161            RolapUtil.ExecuteQueryHook hook = RolapUtil.getHook();
162            if (hook != null) {
163                hook.onExecuteQuery(sql);
164            }
165
166            // Check execution state
167            locus.execution.checkCancelOrTimeout();
168
169            startTimeNanos = System.nanoTime();
170            startTimeMillis = System.currentTimeMillis();
171
172            if (resultSetType < 0 || resultSetConcurrency < 0) {
173                statement = jdbcConnection.createStatement();
174            } else {
175                statement = jdbcConnection.createStatement(
176                    resultSetType,
177                    resultSetConcurrency);
178            }
179            if (maxRows > 0) {
180                statement.setMaxRows(maxRows);
181            }
182
183            // First make sure to register with the execution instance.
184            if (getPurpose() != Purpose.CELL_SEGMENT) {
185                locus.execution.registerStatement(locus, statement);
186            } else {
187                if (callback != null) {
188                    callback.apply(statement);
189                }
190            }
191
192            locus.getServer().getMonitor().sendEvent(
193                new SqlStatementStartEvent(
194                    startTimeMillis,
195                    id,
196                    locus,
197                    sql,
198                    getPurpose(),
199                    getCellRequestCount()));
200
201            this.resultSet = statement.executeQuery(sql);
202
203            // skip to first row specified in request
204            this.state = State.ACTIVE;
205            if (firstRowOrdinal > 0) {
206                if (resultSetType == ResultSet.TYPE_FORWARD_ONLY) {
207                    for (int i = 0; i < firstRowOrdinal; ++i) {
208                        if (!this.resultSet.next()) {
209                            this.state = State.DONE;
210                            break;
211                        }
212                    }
213                } else {
214                    if (!this.resultSet.absolute(firstRowOrdinal)) {
215                        this.state = State.DONE;
216                    }
217                }
218            }
219
220            long timeMillis = System.currentTimeMillis();
221            long timeNanos = System.nanoTime();
222            final long executeNanos = timeNanos - startTimeNanos;
223            final long executeMillis = executeNanos / 1000000;
224            Util.addDatabaseTime(executeMillis);
225            status = ", exec " + executeMillis + " ms";
226
227            locus.getServer().getMonitor().sendEvent(
228                new SqlStatementExecuteEvent(
229                    timeMillis,
230                    id,
231                    locus,
232                    sql,
233                    getPurpose(),
234                    executeNanos));
235
236            // Compute accessors. They ensure that we use the most efficient
237            // method (e.g. getInt, getDouble, getObject) for the type of the
238            // column. Even if you are going to box the result into an object,
239            // it is better to use getInt than getObject; the latter might
240            // return something daft like a BigDecimal (does, on the Oracle JDBC
241            // driver).
242            accessors.clear();
243            for (Type type : guessTypes()) {
244                accessors.add(createAccessor(accessors.size(), type));
245            }
246        } catch (Throwable e) {
247            status = ", failed (" + e + ")";
248
249            // This statement was leaked to us. It is our responsibility
250            // to dispose of it.
251            Util.close(null, statement, null);
252
253            // Now handle this exception.
254            throw handle(e);
255        } finally {
256            RolapUtil.SQL_LOGGER.debug(id + ": " + status);
257
258            if (RolapUtil.LOGGER.isDebugEnabled()) {
259                RolapUtil.LOGGER.debug(
260                    locus.component + ": executing sql [" + sql + "]" + status);
261            }
262        }
263    }
264
265    /**
266     * Closes all resources (statement, result set) held by this
267     * SqlStatement.
268     *
269     * <p>If any of them fails, wraps them in a
270     * {@link RuntimeException} describing the high-level operation which
271     * this statement was performing. No further error-handling is required
272     * to produce a descriptive stack trace, unless you want to absorb the
273     * error.</p>
274     *
275     * <p>This method is idempotent.</p>
276     */
277    public void close() {
278        if (state == State.CLOSED) {
279            return;
280        }
281        state = State.CLOSED;
282
283        if (haveSemaphore) {
284            haveSemaphore = false;
285            querySemaphore.leave();
286        }
287
288        // According to the JDBC spec, closing a statement automatically closes
289        // its result sets, and closing a connection automatically closes its
290        // statements. But let's be conservative and close everything
291        // explicitly.
292        SQLException ex = Util.close(resultSet, null, jdbcConnection);
293        resultSet = null;
294        jdbcConnection = null;
295
296        if (ex != null) {
297            throw Util.newError(
298                ex,
299                locus.message + "; sql=[" + sql + "]");
300        }
301
302        long endTime = System.currentTimeMillis();
303        long totalMs = endTime - startTimeMillis;
304        String status =
305            ", exec+fetch " + totalMs + " ms, " + rowCount + " rows";
306
307        locus.execution.getQueryTiming().markFull(
308            TIMING_NAME + locus.component, totalMs);
309
310        RolapUtil.SQL_LOGGER.debug(id + ": " + status);
311
312        Counters.SQL_STATEMENT_CLOSE_COUNT.incrementAndGet();
313        boolean remove = Counters.SQL_STATEMENT_EXECUTING_IDS.remove(id);
314        status += ", ex=" + Counters.SQL_STATEMENT_EXECUTE_COUNT.get()
315            + ", close=" + Counters.SQL_STATEMENT_CLOSE_COUNT.get()
316            + ", open=" + Counters.SQL_STATEMENT_EXECUTING_IDS;
317
318        if (RolapUtil.LOGGER.isDebugEnabled()) {
319            RolapUtil.LOGGER.debug(
320                locus.component + ": done executing sql [" + sql + "]"
321                + status);
322        }
323
324        if (!remove) {
325            throw new AssertionError(
326                "SqlStatement closed that was never executed: " + id);
327        }
328
329        locus.getServer().getMonitor().sendEvent(
330            new SqlStatementEndEvent(
331                endTime,
332                id,
333                locus,
334                sql,
335                getPurpose(),
336                rowCount,
337                false,
338                null));
339    }
340
341    public ResultSet getResultSet() {
342        return resultSet;
343    }
344
345    /**
346     * Handles an exception thrown from the ResultSet, implicitly calls
347     * {@link #close}, and returns an exception which includes the full
348     * stack, including a description of the high-level operation.
349     *
350     * @param e Exception
351     * @return Runtime exception
352     */
353    public RuntimeException handle(Throwable e) {
354        RuntimeException runtimeException =
355            Util.newError(e, locus.message + "; sql=[" + sql + "]");
356        try {
357            close();
358        } catch (Throwable t) {
359            // ignore
360        }
361        return runtimeException;
362    }
363
364    private Accessor createAccessor(int column, Type type) {
365        final int columnPlusOne = column + 1;
366        switch (type) {
367        case OBJECT:
368            return new Accessor() {
369                public Object get() throws SQLException {
370                    return resultSet.getObject(columnPlusOne);
371                }
372            };
373        case STRING:
374            return new Accessor() {
375                public Object get() throws SQLException {
376                    return resultSet.getString(columnPlusOne);
377                }
378            };
379        case INT:
380            return new Accessor() {
381                public Object get() throws SQLException {
382                    final int val = resultSet.getInt(columnPlusOne);
383                    if (val == 0 && resultSet.wasNull()) {
384                        return null;
385                    }
386                    return val;
387                }
388            };
389        case LONG:
390            return new Accessor() {
391                public Object get() throws SQLException {
392                    final long val = resultSet.getLong(columnPlusOne);
393                    if (val == 0 && resultSet.wasNull()) {
394                        return null;
395                    }
396                    return val;
397                }
398            };
399        case DOUBLE:
400            return new Accessor() {
401                public Object get() throws SQLException {
402                    final double val = resultSet.getDouble(columnPlusOne);
403                    if (val == 0 && resultSet.wasNull()) {
404                        return null;
405                    }
406                    return val;
407                }
408            };
409        default:
410            throw Util.unexpected(type);
411        }
412    }
413
414    public List<Type> guessTypes() throws SQLException {
415        final ResultSetMetaData metaData = resultSet.getMetaData();
416        final int columnCount = metaData.getColumnCount();
417        assert this.types == null || this.types.size() == columnCount;
418        List<Type> types = new ArrayList<Type>();
419
420        for (int i = 0; i < columnCount; i++) {
421            final Type suggestedType =
422                this.types == null ? null : this.types.get(i);
423            // There might not be a schema constructed yet,
424            // so watch out here for NPEs.
425            RolapSchema schema = locus.execution.getMondrianStatement()
426                .getMondrianConnection()
427                .getSchema();
428
429            if (suggestedType != null) {
430                types.add(suggestedType);
431            } else if (schema != null && schema.getDialect() != null) {
432                types.add(schema.getDialect().getType(metaData, i));
433            } else {
434                types.add(Type.OBJECT);
435            }
436        }
437        return types;
438    }
439
440    public List<Accessor> getAccessors() throws SQLException {
441        return accessors;
442    }
443
444    /**
445     * Returns the result set in a proxy which automatically closes this
446     * SqlStatement (and hence also the statement and result set) when the
447     * result set is closed.
448     *
449     * <p>This helps to prevent connection leaks. The caller still has to
450     * remember to call ResultSet.close(), of course.
451     *
452     * @return Wrapped result set
453     */
454    public ResultSet getWrappedResultSet() {
455        return (ResultSet) Proxy.newProxyInstance(
456            null,
457            new Class<?>[] {ResultSet.class},
458            new MyDelegatingInvocationHandler(this));
459    }
460
461    private SqlStatementEvent.Purpose getPurpose() {
462        if (locus instanceof StatementLocus) {
463            return ((StatementLocus) locus).purpose;
464        } else {
465            return SqlStatementEvent.Purpose.OTHER;
466        }
467    }
468
469    private int getCellRequestCount() {
470        if (locus instanceof StatementLocus) {
471            return ((StatementLocus) locus).cellRequestCount;
472        } else {
473            return 0;
474        }
475    }
476
477    /**
478     * The approximate JDBC type of a column.
479     *
480     * <p>This type affects which {@link ResultSet} method we use to get values
481     * of this column: the default is {@link java.sql.ResultSet#getObject(int)},
482     * but we'd prefer to use native values {@code getInt} and {@code getDouble}
483     * if possible.
484     */
485    public enum Type {
486        OBJECT,
487        DOUBLE,
488        INT,
489        LONG,
490        STRING;
491
492        public Object get(ResultSet resultSet, int column) throws SQLException {
493            switch (this) {
494            case OBJECT:
495                return resultSet.getObject(column + 1);
496            case STRING:
497                return resultSet.getString(column + 1);
498            case INT:
499                return resultSet.getInt(column + 1);
500            case LONG:
501                return resultSet.getLong(column + 1);
502            case DOUBLE:
503                return resultSet.getDouble(column + 1);
504            default:
505                throw Util.unexpected(this);
506            }
507        }
508    }
509
510    public interface Accessor {
511        Object get() throws SQLException;
512    }
513
514    /**
515     * Reflectively implements the {@link ResultSet} interface by routing method
516     * calls to the result set inside a {@link mondrian.rolap.SqlStatement}.
517     * When the result set is closed, so is the SqlStatement, and hence the
518     * JDBC connection and statement also.
519     */
520    // must be public for reflection to work
521    public static class MyDelegatingInvocationHandler
522        extends DelegatingInvocationHandler
523    {
524        private final SqlStatement sqlStatement;
525
526        /**
527         * Creates a MyDelegatingInvocationHandler.
528         *
529         * @param sqlStatement SQL statement
530         */
531        MyDelegatingInvocationHandler(SqlStatement sqlStatement) {
532            this.sqlStatement = sqlStatement;
533        }
534
535        protected Object getTarget() throws InvocationTargetException {
536            final ResultSet resultSet = sqlStatement.getResultSet();
537            if (resultSet == null) {
538                throw new InvocationTargetException(
539                    new SQLException(
540                        "Invalid operation. Statement is closed."));
541            }
542            return resultSet;
543        }
544
545        /**
546         * Helper method to implement {@link java.sql.ResultSet#close()}.
547         *
548         * @throws SQLException on error
549         */
550        public void close() throws SQLException {
551            sqlStatement.close();
552        }
553    }
554
555    private enum State {
556        FRESH,
557        ACTIVE,
558        DONE,
559        CLOSED
560    }
561
562    public static class StatementLocus extends Locus {
563        private final SqlStatementEvent.Purpose purpose;
564        private final int cellRequestCount;
565
566        public StatementLocus(
567            Execution execution,
568            String component,
569            String message,
570            SqlStatementEvent.Purpose purpose,
571            int cellRequestCount)
572        {
573            super(
574                execution,
575                component,
576                message);
577            this.purpose = purpose;
578            this.cellRequestCount = cellRequestCount;
579        }
580    }
581}
582
583// End SqlStatement.java