1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.hbql.impl;
22
23 import org.apache.expreval.expr.ExpressionTree;
24 import org.apache.expreval.expr.literal.DateLiteral;
25 import org.apache.hadoop.hbase.client.ResultScanner;
26 import org.apache.hadoop.hbase.hbql.client.HBqlException;
27 import org.apache.hadoop.hbase.hbql.client.HResultSet;
28 import org.apache.hadoop.hbase.hbql.mapping.MappingContext;
29 import org.apache.hadoop.hbase.hbql.statement.SelectStatement;
30 import org.apache.hadoop.hbase.hbql.statement.args.WithArgs;
31 import org.apache.hadoop.hbase.hbql.statement.select.RowRequest;
32 import org.apache.hadoop.hbase.hbql.util.Lists;
33
34 import java.io.IOException;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicLong;
39
40 public abstract class HResultSetImpl<T, R> implements HResultSet<T> {
41
42
43 private final AtomicLong returnedRecordCount = new AtomicLong(0L);
44 private final long returnedRecordLimit;
45
46 private AtomicBoolean atomicClosed = new AtomicBoolean(false);
47 private final List<ResultScanner> resultScannerList = Lists.newArrayList();
48 private ResultScanner currentResultScanner = null;
49 private AggregateRecord aggregateRecord;
50 private final HTableWrapper tableWrapper;
51 private final AtomicBoolean tableWrapperClosed = new AtomicBoolean(false);
52
53 private final Query<T> query;
54 private final ExpressionTree clientExpressionTree;
55 private final CompletionQueueExecutor<R> completionQueueExecutor;
56
57 protected HResultSetImpl(final Query<T> query,
58 final CompletionQueueExecutor<R> completionQueueExecutor) throws HBqlException {
59 this.query = query;
60 this.completionQueueExecutor = completionQueueExecutor;
61 this.clientExpressionTree = this.getWithArgs().getClientExpressionTree();
62 this.returnedRecordLimit = this.getWithArgs().getLimit();
63
64
65 this.tableWrapper = this.getHConnectionImpl().newHTableWrapper(this.getWithArgs(), this.getTableName());
66 this.getQuery().getSelectStmt().determineIfAggregateQuery();
67
68 if (this.getSelectStmt().isAnAggregateQuery())
69 this.setAggregateRecord(AggregateRecord.newAggregateRecord(this.getQuery().getSelectStmt()));
70
71
72 DateLiteral.resetNow();
73
74 if (this.getCompletionQueueExecutor() != null) {
75
76 final List<RowRequest> rowRequestList = this.getQuery().getRowRequestList();
77 this.getCompletionQueueExecutor().submitWorkToSubmitterThread(
78 new Runnable() {
79 public void run() {
80 submitWork(rowRequestList);
81 getCompletionQueueExecutor().putCompletion();
82 }
83 }
84 );
85 }
86 }
87
88 protected abstract void submitWork(final List<RowRequest> rowRequestList);
89
90 public abstract Iterator<T> iterator();
91
92 protected void cleanUpAtEndOfIterator() {
93
94 try {
95 try {
96 if (this.getTableWrapper() != null)
97 this.getTableWrapper().getHTable().close();
98 }
99 catch (IOException e) {
100
101 e.printStackTrace();
102 }
103
104 this.getQuery().callOnQueryComplete();
105 }
106 finally {
107
108
109 if (!this.getTableWrapperClosed().get())
110 this.getTableWrapper().releaseHTable();
111
112 this.getTableWrapperClosed().set(true);
113
114 this.close();
115 }
116 }
117
118 private AtomicBoolean getTableWrapperClosed() {
119 return this.tableWrapperClosed;
120 }
121
122 protected CompletionQueueExecutor<R> getCompletionQueueExecutor() {
123 return this.completionQueueExecutor;
124 }
125
126 protected void setAggregateRecord(final AggregateRecord aggregateRecord) {
127 this.aggregateRecord = aggregateRecord;
128 }
129
130 protected AggregateRecord getAggregateRecord() {
131 return this.aggregateRecord;
132 }
133
134 protected List<ResultScanner> getResultScannerList() {
135 return this.resultScannerList;
136 }
137
138 protected ResultScanner getCurrentResultScanner() {
139 return this.currentResultScanner;
140 }
141
142 protected void setCurrentResultScanner(final ResultScanner currentResultScanner) {
143
144 closeResultScanner(getCurrentResultScanner(), true);
145 this.currentResultScanner = currentResultScanner;
146 getResultScannerList().add(getCurrentResultScanner());
147 }
148
149 protected void closeResultScanner(final ResultScanner scanner, final boolean removeFromList) {
150 if (scanner != null) {
151 try {
152 scanner.close();
153 }
154 catch (Exception e) {
155
156 }
157 if (removeFromList)
158 getResultScannerList().remove(scanner);
159 }
160 }
161
162 private AtomicBoolean getAtomicClosed() {
163 return this.atomicClosed;
164 }
165
166 public boolean isClosed() {
167 return this.getAtomicClosed().get();
168 }
169
170 public synchronized void close() {
171 if (!this.isClosed()) {
172 for (final ResultScanner scanner : this.getResultScannerList())
173 closeResultScanner(scanner, false);
174
175 this.getResultScannerList().clear();
176
177 if (this.getCompletionQueueExecutor() != null) {
178 this.getCompletionQueueExecutor().close();
179 }
180
181 this.getAtomicClosed().set(true);
182 }
183 }
184
185 protected long getReturnedRecordCount() {
186 return this.returnedRecordCount.get();
187 }
188
189 protected void incrementRecordCount() {
190 this.returnedRecordCount.incrementAndGet();
191 }
192
193 protected boolean returnedRecordLimitMet() {
194 return this.getReturnedRecordLimit() > 0 && this.getReturnedRecordCount() > this.getReturnedRecordLimit();
195 }
196
197 protected long getReturnedRecordLimit() {
198 return this.returnedRecordLimit;
199 }
200
201 protected int getMaxVersions() throws HBqlException {
202 return this.getQuery().getSelectStmt().getWithArgs().getMaxVersions();
203 }
204
205 protected Query<T> getQuery() {
206 return this.query;
207 }
208
209 protected ExpressionTree getClientExpressionTree() {
210 return this.clientExpressionTree;
211 }
212
213 protected HTableWrapper getTableWrapper() {
214 return this.tableWrapper;
215 }
216
217 protected HConnectionImpl getHConnectionImpl() {
218 return this.getQuery().getHConnectionImpl();
219 }
220
221 protected SelectStatement getSelectStmt() {
222 return this.getQuery().getSelectStmt();
223 }
224
225 protected MappingContext getMappingContext() {
226 return this.getSelectStmt().getMappingContext();
227 }
228
229 protected String getTableName() {
230 return this.getMappingContext().getMapping().getTableName();
231 }
232
233 protected WithArgs getWithArgs() {
234 return this.getSelectStmt().getWithArgs();
235 }
236 }