1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.hbql.impl;
22
23 import org.apache.expreval.client.InternalErrorException;
24 import org.apache.hadoop.hbase.hbql.client.ExecutionResults;
25 import org.apache.hadoop.hbase.hbql.client.HBqlException;
26 import org.apache.hadoop.hbase.hbql.client.HRecord;
27 import org.apache.hadoop.hbase.hbql.client.HResultSet;
28 import org.apache.hadoop.hbase.hbql.client.HStatement;
29 import org.apache.hadoop.hbase.hbql.client.QueryFuture;
30 import org.apache.hadoop.hbase.hbql.client.QueryListener;
31 import org.apache.hadoop.hbase.hbql.statement.ConnectionStatement;
32 import org.apache.hadoop.hbase.hbql.statement.HBqlStatement;
33 import org.apache.hadoop.hbase.hbql.statement.NonConnectionStatement;
34 import org.apache.hadoop.hbase.hbql.statement.SelectStatement;
35 import org.apache.hadoop.hbase.hbql.util.Lists;
36
37 import java.util.List;
38 import java.util.concurrent.atomic.AtomicBoolean;
39
40
41 public class HStatementImpl implements HStatement {
42
43 private final AtomicBoolean atomicClosed = new AtomicBoolean(false);
44 private final AtomicBoolean ignoreQueryExecutor = new AtomicBoolean(false);
45 private final HConnectionImpl connection;
46 private HResultSet resultSet = null;
47
48 public HStatementImpl(final HConnectionImpl conn) {
49 this.connection = conn;
50 }
51
52 protected HConnectionImpl getHConnectionImpl() {
53 return this.connection;
54 }
55
56 public <T> HResultSet<T> getResultSet() {
57 return (HResultSet<T>)this.resultSet;
58 }
59
60 public boolean getIgnoreQueryExecutor() {
61 return this.ignoreQueryExecutor.get();
62 }
63
64 public void setIgnoreQueryExecutor(final boolean ignoreQueryExecutor) {
65 this.ignoreQueryExecutor.set(ignoreQueryExecutor);
66 }
67
68 public ExecutionResults executeUpdate(final HBqlStatement statement) throws HBqlException {
69
70 if (Utils.isSelectStatement(statement)) {
71 throw new HBqlException("executeUpdate() requires a non-SELECT statement");
72 }
73 else if (Utils.isConnectionStatemet(statement)) {
74 return ((ConnectionStatement)statement).evaluatePredicateAndExecute(this.getHConnectionImpl());
75 }
76 else if (Utils.isNonConectionStatemet(statement)) {
77 return ((NonConnectionStatement)statement).execute();
78 }
79 else {
80 throw new InternalErrorException("Bad state with " + statement.getClass().getSimpleName());
81 }
82 }
83
84 protected <T> HResultSet<T> executeQuery(final HBqlStatement statement,
85 final Class clazz,
86 final QueryListener<T>... listeners) throws HBqlException {
87
88 if (!Utils.isSelectStatement(statement))
89 throw new HBqlException("executeQuery() requires a SELECT statement");
90
91 final Query<T> query = Query.newQuery(this.getHConnectionImpl(), (SelectStatement)statement, clazz, listeners);
92
93 this.resultSet = query.newResultSet(this.getIgnoreQueryExecutor());
94
95 return this.resultSet;
96 }
97
98 protected <T> QueryFuture executeQueryAsync(final HBqlStatement statement,
99 final Class clazz,
100 final QueryListener<T>... listeners) throws HBqlException {
101
102 if (!Utils.isSelectStatement(statement))
103 throw new HBqlException("executeQueryAsync() requires a SELECT statement");
104
105 final AsyncExecutorImpl asyncExecutor = this.getHConnectionImpl().getAsyncExecutorForConnection();
106
107 final Query<T> query = Query.newQuery(this.getHConnectionImpl(), (SelectStatement)statement, clazz, listeners);
108
109 return asyncExecutor.submit(
110 new AsyncRunnable() {
111 public void run() {
112 try {
113 final HResultSet<T> rs = query.newResultSet(false);
114 for (T rec : rs) {
115
116 }
117 }
118 catch (HBqlException e) {
119 e.printStackTrace();
120 this.getQueryFuture().setCaughtException(e);
121 }
122 }
123 });
124 }
125
126 protected <T> List<T> executeQueryAndFetch(final HBqlStatement statement,
127 final Class clazz,
128 final QueryListener<T>... listeners) throws HBqlException {
129
130 final List<T> retval = Lists.newArrayList();
131
132 HResultSet<T> results = null;
133
134 try {
135 results = this.executeQuery(statement, clazz, listeners);
136
137 for (final T val : results)
138 retval.add(val);
139 }
140 finally {
141 if (results != null)
142 results.close();
143 }
144
145 return retval;
146 }
147
148 protected ExecutionResults execute(final HBqlStatement statement) throws HBqlException {
149 if (Utils.isSelectStatement(statement)) {
150 this.executeQuery(statement, null);
151 return new ExecutionResults("Query executed");
152 }
153 else {
154 return this.executeUpdate(statement);
155 }
156 }
157
158 public ExecutionResults execute(final String sql) throws HBqlException {
159 return this.execute(Utils.parseHBqlStatement(sql));
160 }
161
162 public HResultSet<HRecord> executeQuery(final String sql,
163 final QueryListener<HRecord>... listeners) throws HBqlException {
164 return this.executeQuery(Utils.parseHBqlStatement(sql), HRecord.class, listeners);
165 }
166
167 public QueryFuture executeQueryAsync(final String sql,
168 final QueryListener<HRecord>... listeners) throws HBqlException {
169 return this.executeQueryAsync(Utils.parseHBqlStatement(sql), HRecord.class, listeners);
170 }
171
172 public <T> HResultSet<T> executeQuery(final String sql,
173 final Class clazz,
174 final QueryListener<T>... listeners) throws HBqlException {
175 return this.executeQuery(Utils.parseHBqlStatement(sql), clazz, listeners);
176 }
177
178 public <T> QueryFuture executeQueryAsync(final String sql,
179 final Class clazz,
180 final QueryListener<T>... listeners) throws HBqlException {
181 return this.executeQueryAsync(Utils.parseHBqlStatement(sql), clazz, listeners);
182 }
183
184 public List<HRecord> executeQueryAndFetch(final String sql,
185 final QueryListener<HRecord>... listeners) throws HBqlException {
186 return this.executeQueryAndFetch(Utils.parseHBqlStatement(sql), HRecord.class, listeners);
187 }
188
189 public <T> List<T> executeQueryAndFetch(final String sql,
190 final Class clazz,
191 final QueryListener<T>... listeners) throws HBqlException {
192 return this.executeQueryAndFetch(Utils.parseHBqlStatement(sql), clazz, listeners);
193 }
194
195 public ExecutionResults executeUpdate(final String sql) throws HBqlException {
196 return this.executeUpdate(Utils.parseHBqlStatement(sql));
197 }
198
199 private AtomicBoolean getAtomicClosed() {
200 return this.atomicClosed;
201 }
202
203 public boolean isClosed() {
204 return this.getAtomicClosed().get();
205 }
206
207 public synchronized void close() throws HBqlException {
208 if (!this.isClosed()) {
209 if (this.getResultSet() != null)
210 this.getResultSet().close();
211
212 this.getAtomicClosed().set(true);
213 }
214 }
215 }