View Javadoc

1   /*
2    * Copyright (c) 2010.  The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.hbql.impl;
22  
23  import org.apache.expreval.expr.ExpressionTree;
24  import org.apache.expreval.expr.literal.DateLiteral;
25  import org.apache.hadoop.hbase.client.ResultScanner;
26  import org.apache.hadoop.hbase.hbql.client.HBqlException;
27  import org.apache.hadoop.hbase.hbql.client.HResultSet;
28  import org.apache.hadoop.hbase.hbql.mapping.MappingContext;
29  import org.apache.hadoop.hbase.hbql.statement.SelectStatement;
30  import org.apache.hadoop.hbase.hbql.statement.args.WithArgs;
31  import org.apache.hadoop.hbase.hbql.statement.select.RowRequest;
32  import org.apache.hadoop.hbase.hbql.util.Lists;
33  
34  import java.io.IOException;
35  import java.util.Iterator;
36  import java.util.List;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicLong;
39  
40  public abstract class HResultSetImpl<T, R> implements HResultSet<T> {
41  
42      // Record count keeps track of values that have evaluated as true and returned to user
43      private final AtomicLong returnedRecordCount = new AtomicLong(0L);
44      private final long returnedRecordLimit;
45  
46      private AtomicBoolean atomicClosed = new AtomicBoolean(false);
47      private final List<ResultScanner> resultScannerList = Lists.newArrayList();
48      private ResultScanner currentResultScanner = null;
49      private AggregateRecord aggregateRecord;
50      private final HTableWrapper tableWrapper;
51      private final AtomicBoolean tableWrapperClosed = new AtomicBoolean(false);
52  
53      private final Query<T> query;
54      private final ExpressionTree clientExpressionTree;
55      private final CompletionQueueExecutor<R> completionQueueExecutor;
56  
57      protected HResultSetImpl(final Query<T> query,
58                               final CompletionQueueExecutor<R> completionQueueExecutor) throws HBqlException {
59          this.query = query;
60          this.completionQueueExecutor = completionQueueExecutor;
61          this.clientExpressionTree = this.getWithArgs().getClientExpressionTree();
62          this.returnedRecordLimit = this.getWithArgs().getLimit();
63  
64          //this.setTableWrapper(this.getHConnectionImpl().newHTableWrapper(this.getWithArgs(), this.getTableName()));
65          this.tableWrapper = this.getHConnectionImpl().newHTableWrapper(this.getWithArgs(), this.getTableName());
66          this.getQuery().getSelectStmt().determineIfAggregateQuery();
67  
68          if (this.getSelectStmt().isAnAggregateQuery())
69              this.setAggregateRecord(AggregateRecord.newAggregateRecord(this.getQuery().getSelectStmt()));
70  
71          // Set it once per evaluation
72          DateLiteral.resetNow();
73  
74          if (this.getCompletionQueueExecutor() != null) {
75              // Submit work to executor
76              final List<RowRequest> rowRequestList = this.getQuery().getRowRequestList();
77              this.getCompletionQueueExecutor().submitWorkToSubmitterThread(
78                      new Runnable() {
79                          public void run() {
80                              submitWork(rowRequestList);
81                              getCompletionQueueExecutor().putCompletion();
82                          }
83                      }
84              );
85          }
86      }
87  
88      protected abstract void submitWork(final List<RowRequest> rowRequestList);
89  
90      public abstract Iterator<T> iterator();
91  
92      protected void cleanUpAtEndOfIterator() {
93  
94          try {
95              try {
96                  if (this.getTableWrapper() != null)
97                      this.getTableWrapper().getHTable().close();
98              }
99              catch (IOException e) {
100                 // No op
101                 e.printStackTrace();
102             }
103 
104             this.getQuery().callOnQueryComplete();
105         }
106         finally {
107             // release to table pool
108             //if (this.getTableWrapper() != null)
109             if (!this.getTableWrapperClosed().get())
110                 this.getTableWrapper().releaseHTable();
111 
112             this.getTableWrapperClosed().set(true);
113 
114             this.close();
115         }
116     }
117 
118     private AtomicBoolean getTableWrapperClosed() {
119         return this.tableWrapperClosed;
120     }
121 
122     protected CompletionQueueExecutor<R> getCompletionQueueExecutor() {
123         return this.completionQueueExecutor;
124     }
125 
126     protected void setAggregateRecord(final AggregateRecord aggregateRecord) {
127         this.aggregateRecord = aggregateRecord;
128     }
129 
130     protected AggregateRecord getAggregateRecord() {
131         return this.aggregateRecord;
132     }
133 
134     protected List<ResultScanner> getResultScannerList() {
135         return this.resultScannerList;
136     }
137 
138     protected ResultScanner getCurrentResultScanner() {
139         return this.currentResultScanner;
140     }
141 
142     protected void setCurrentResultScanner(final ResultScanner currentResultScanner) {
143         // First close previous ResultScanner before reassigning
144         closeResultScanner(getCurrentResultScanner(), true);
145         this.currentResultScanner = currentResultScanner;
146         getResultScannerList().add(getCurrentResultScanner());
147     }
148 
149     protected void closeResultScanner(final ResultScanner scanner, final boolean removeFromList) {
150         if (scanner != null) {
151             try {
152                 scanner.close();
153             }
154             catch (Exception e) {
155                 // Do nothing
156             }
157             if (removeFromList)
158                 getResultScannerList().remove(scanner);
159         }
160     }
161 
162     private AtomicBoolean getAtomicClosed() {
163         return this.atomicClosed;
164     }
165 
166     public boolean isClosed() {
167         return this.getAtomicClosed().get();
168     }
169 
170     public synchronized void close() {
171         if (!this.isClosed()) {
172             for (final ResultScanner scanner : this.getResultScannerList())
173                 closeResultScanner(scanner, false);
174 
175             this.getResultScannerList().clear();
176 
177             if (this.getCompletionQueueExecutor() != null) {
178                 this.getCompletionQueueExecutor().close();
179             }
180 
181             this.getAtomicClosed().set(true);
182         }
183     }
184 
185     protected long getReturnedRecordCount() {
186         return this.returnedRecordCount.get();
187     }
188 
189     protected void incrementRecordCount() {
190         this.returnedRecordCount.incrementAndGet();
191     }
192 
193     protected boolean returnedRecordLimitMet() {
194         return this.getReturnedRecordLimit() > 0 && this.getReturnedRecordCount() > this.getReturnedRecordLimit();
195     }
196 
197     protected long getReturnedRecordLimit() {
198         return this.returnedRecordLimit;
199     }
200 
201     protected int getMaxVersions() throws HBqlException {
202         return this.getQuery().getSelectStmt().getWithArgs().getMaxVersions();
203     }
204 
205     protected Query<T> getQuery() {
206         return this.query;
207     }
208 
209     protected ExpressionTree getClientExpressionTree() {
210         return this.clientExpressionTree;
211     }
212 
213     protected HTableWrapper getTableWrapper() {
214         return this.tableWrapper;
215     }
216 
217     protected HConnectionImpl getHConnectionImpl() {
218         return this.getQuery().getHConnectionImpl();
219     }
220 
221     protected SelectStatement getSelectStmt() {
222         return this.getQuery().getSelectStmt();
223     }
224 
225     protected MappingContext getMappingContext() {
226         return this.getSelectStmt().getMappingContext();
227     }
228 
229     protected String getTableName() {
230         return this.getMappingContext().getMapping().getTableName();
231     }
232 
233     protected WithArgs getWithArgs() {
234         return this.getSelectStmt().getWithArgs();
235     }
236 }