View Javadoc

1   /*
2    * Copyright (c) 2011.  The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.hbql.impl;
22  
23  import org.apache.expreval.client.InternalErrorException;
24  import org.apache.hadoop.hbase.hbql.client.ExecutionResults;
25  import org.apache.hadoop.hbase.hbql.client.HBqlException;
26  import org.apache.hadoop.hbase.hbql.client.HRecord;
27  import org.apache.hadoop.hbase.hbql.client.HResultSet;
28  import org.apache.hadoop.hbase.hbql.client.HStatement;
29  import org.apache.hadoop.hbase.hbql.client.QueryFuture;
30  import org.apache.hadoop.hbase.hbql.client.QueryListener;
31  import org.apache.hadoop.hbase.hbql.statement.ConnectionStatement;
32  import org.apache.hadoop.hbase.hbql.statement.HBqlStatement;
33  import org.apache.hadoop.hbase.hbql.statement.NonConnectionStatement;
34  import org.apache.hadoop.hbase.hbql.statement.SelectStatement;
35  import org.apache.hadoop.hbase.hbql.util.Lists;
36  
37  import java.util.List;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  
40  
41  public class HStatementImpl implements HStatement {
42  
43      private final AtomicBoolean atomicClosed        = new AtomicBoolean(false);
44      private final AtomicBoolean ignoreQueryExecutor = new AtomicBoolean(false);
45      private final HConnectionImpl connection;
46      private HResultSet resultSet = null;
47  
48      public HStatementImpl(final HConnectionImpl conn) {
49          this.connection = conn;
50      }
51  
52      protected HConnectionImpl getHConnectionImpl() {
53          return this.connection;
54      }
55  
56      public <T> HResultSet<T> getResultSet() {
57          return (HResultSet<T>)this.resultSet;
58      }
59  
60      public boolean getIgnoreQueryExecutor() {
61          return this.ignoreQueryExecutor.get();
62      }
63  
64      public void setIgnoreQueryExecutor(final boolean ignoreQueryExecutor) {
65          this.ignoreQueryExecutor.set(ignoreQueryExecutor);
66      }
67  
68      public ExecutionResults executeUpdate(final HBqlStatement statement) throws HBqlException {
69  
70          if (Utils.isSelectStatement(statement)) {
71              throw new HBqlException("executeUpdate() requires a non-SELECT statement");
72          }
73          else if (Utils.isConnectionStatemet(statement)) {
74              return ((ConnectionStatement)statement).evaluatePredicateAndExecute(this.getHConnectionImpl());
75          }
76          else if (Utils.isNonConectionStatemet(statement)) {
77              return ((NonConnectionStatement)statement).execute();
78          }
79          else {
80              throw new InternalErrorException("Bad state with " + statement.getClass().getSimpleName());
81          }
82      }
83  
84      protected <T> HResultSet<T> executeQuery(final HBqlStatement statement,
85                                               final Class clazz,
86                                               final QueryListener<T>... listeners) throws HBqlException {
87  
88          if (!Utils.isSelectStatement(statement))
89              throw new HBqlException("executeQuery() requires a SELECT statement");
90  
91          final Query<T> query = Query.newQuery(this.getHConnectionImpl(), (SelectStatement)statement, clazz, listeners);
92  
93          this.resultSet = query.newResultSet(this.getIgnoreQueryExecutor());
94  
95          return this.resultSet;
96      }
97  
98      protected <T> QueryFuture executeQueryAsync(final HBqlStatement statement,
99                                                  final Class clazz,
100                                                 final QueryListener<T>... listeners) throws HBqlException {
101 
102         if (!Utils.isSelectStatement(statement))
103             throw new HBqlException("executeQueryAsync() requires a SELECT statement");
104 
105         final AsyncExecutorImpl asyncExecutor = this.getHConnectionImpl().getAsyncExecutorForConnection();
106 
107         final Query<T> query = Query.newQuery(this.getHConnectionImpl(), (SelectStatement)statement, clazz, listeners);
108 
109         return asyncExecutor.submit(
110                 new AsyncRunnable() {
111                     public void run() {
112                         try {
113                             final HResultSet<T> rs = query.newResultSet(false);
114                             for (T rec : rs) {
115                                 // Iterate through the results
116                             }
117                         }
118                         catch (HBqlException e) {
119                             e.printStackTrace();
120                             this.getQueryFuture().setCaughtException(e);
121                         }
122                     }
123                 });
124     }
125 
126     protected <T> List<T> executeQueryAndFetch(final HBqlStatement statement,
127                                                final Class clazz,
128                                                final QueryListener<T>... listeners) throws HBqlException {
129 
130         final List<T> retval = Lists.newArrayList();
131 
132         HResultSet<T> results = null;
133 
134         try {
135             results = this.executeQuery(statement, clazz, listeners);
136 
137             for (final T val : results)
138                 retval.add(val);
139         }
140         finally {
141             if (results != null)
142                 results.close();
143         }
144 
145         return retval;
146     }
147 
148     protected ExecutionResults execute(final HBqlStatement statement) throws HBqlException {
149         if (Utils.isSelectStatement(statement)) {
150             this.executeQuery(statement, null);
151             return new ExecutionResults("Query executed");
152         }
153         else {
154             return this.executeUpdate(statement);
155         }
156     }
157 
158     public ExecutionResults execute(final String sql) throws HBqlException {
159         return this.execute(Utils.parseHBqlStatement(sql));
160     }
161 
162     public HResultSet<HRecord> executeQuery(final String sql,
163                                             final QueryListener<HRecord>... listeners) throws HBqlException {
164         return this.executeQuery(Utils.parseHBqlStatement(sql), HRecord.class, listeners);
165     }
166 
167     public QueryFuture executeQueryAsync(final String sql,
168                                          final QueryListener<HRecord>... listeners) throws HBqlException {
169         return this.executeQueryAsync(Utils.parseHBqlStatement(sql), HRecord.class, listeners);
170     }
171 
172     public <T> HResultSet<T> executeQuery(final String sql,
173                                           final Class clazz,
174                                           final QueryListener<T>... listeners) throws HBqlException {
175         return this.executeQuery(Utils.parseHBqlStatement(sql), clazz, listeners);
176     }
177 
178     public <T> QueryFuture executeQueryAsync(final String sql,
179                                              final Class clazz,
180                                              final QueryListener<T>... listeners) throws HBqlException {
181         return this.executeQueryAsync(Utils.parseHBqlStatement(sql), clazz, listeners);
182     }
183 
184     public List<HRecord> executeQueryAndFetch(final String sql,
185                                               final QueryListener<HRecord>... listeners) throws HBqlException {
186         return this.executeQueryAndFetch(Utils.parseHBqlStatement(sql), HRecord.class, listeners);
187     }
188 
189     public <T> List<T> executeQueryAndFetch(final String sql,
190                                             final Class clazz,
191                                             final QueryListener<T>... listeners) throws HBqlException {
192         return this.executeQueryAndFetch(Utils.parseHBqlStatement(sql), clazz, listeners);
193     }
194 
195     public ExecutionResults executeUpdate(final String sql) throws HBqlException {
196         return this.executeUpdate(Utils.parseHBqlStatement(sql));
197     }
198 
199     private AtomicBoolean getAtomicClosed() {
200         return this.atomicClosed;
201     }
202 
203     public boolean isClosed() {
204         return this.getAtomicClosed().get();
205     }
206 
207     public synchronized void close() throws HBqlException {
208         if (!this.isClosed()) {
209             if (this.getResultSet() != null)
210                 this.getResultSet().close();
211 
212             this.getAtomicClosed().set(true);
213         }
214     }
215 }