View Javadoc

1   /*
2    * Copyright (c) 2010.  The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.hbql.impl;
22  
23  import org.apache.expreval.client.NullColumnValueException;
24  import org.apache.expreval.client.ResultMissingColumnException;
25  import org.apache.hadoop.hbase.client.Result;
26  import org.apache.hadoop.hbase.hbql.client.HBqlException;
27  import org.apache.hadoop.hbase.hbql.mapping.ResultAccessor;
28  import org.apache.hadoop.hbase.hbql.statement.SelectStatement;
29  
30  import java.util.Iterator;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  
33  public abstract class ResultSetIterator<T, R> implements Iterator<T> {
34  
35      private final HResultSetImpl<T, R> resultSet;
36      private Iterator<Result> currentResultIterator = null;
37      private volatile T nextObject = null;
38      private final AtomicBoolean iteratorComplete = new AtomicBoolean(false);
39  
40      protected ResultSetIterator(final HResultSetImpl<T, R> resultSet) throws HBqlException {
41          this.resultSet = resultSet;
42  
43          // Prime the iterator with the first value
44          this.setNextObject(this.fetchNextObject());
45      }
46  
47      protected abstract boolean moreResultsPending();
48  
49      protected abstract Iterator<Result> getNextResultIterator() throws HBqlException;
50  
51      private HResultSetImpl<T, R> getResultSet() {
52          return this.resultSet;
53      }
54  
55      protected T getNextObject() {
56          return this.nextObject;
57      }
58  
59      protected void setNextObject(final T nextObject) {
60          this.nextObject = nextObject;
61      }
62  
63      private AtomicBoolean getIteratorComplete() {
64          return this.iteratorComplete;
65      }
66  
67      protected void markIteratorComplete() {
68          this.getIteratorComplete().set(true);
69      }
70  
71      private boolean isIteratorComplete() {
72          return this.getIteratorComplete().get();
73      }
74  
75      protected void iteratorCompleteAction() {
76          this.getResultSet().cleanUpAtEndOfIterator();
77      }
78  
79      public boolean hasNext() {
80  
81          if (this.getResultSet() != null && this.getResultSet().returnedRecordLimitMet())
82              this.markIteratorComplete();
83  
84          if (this.isIteratorComplete())
85              this.iteratorCompleteAction();
86  
87          return !this.isIteratorComplete();
88      }
89  
90      public void remove() {
91  
92      }
93  
94      private Iterator<Result> getCurrentResultIterator() {
95          return this.currentResultIterator;
96      }
97  
98      private void setCurrentResultIterator(final Iterator<Result> currentResultIterator) {
99          this.currentResultIterator = currentResultIterator;
100     }
101 
102     protected void incrementReturnedRecordCount() {
103         if (this.getResultSet() != null)
104             this.getResultSet().incrementRecordCount();
105     }
106 
107     protected void setNextObject(final T nextObject, final boolean fromExceptionCatch) {
108         this.setNextObject(nextObject);
109     }
110 
111     public T next() {
112 
113         // Save value to return;
114         final T retval = this.getNextObject();
115 
116         // Now prefetch next value so that hasNext() will be correct
117         try {
118             final T nextObject = this.fetchNextObject();
119             this.setNextObject(nextObject, false);
120         }
121         catch (HBqlException e) {
122             e.printStackTrace();
123             this.setNextObject(null, true);
124         }
125 
126         return retval;
127     }
128 
129     @SuppressWarnings("unchecked")
130     protected T fetchNextObject() throws HBqlException {
131 
132         final HResultSetImpl<T, R> rs = this.getResultSet();
133         final SelectStatement selectStatement = rs.getSelectStmt();
134         final ResultAccessor resultAccessor = selectStatement.getMappingContext().getResultAccessor();
135 
136         while (this.getCurrentResultIterator() != null || moreResultsPending()) {
137 
138             if (this.getCurrentResultIterator() == null)
139                 this.setCurrentResultIterator(this.getNextResultIterator());
140 
141             while (this.getCurrentResultIterator() != null
142                    && this.getCurrentResultIterator().hasNext()) {
143 
144                 final Result result = this.getCurrentResultIterator().next();
145 
146                 try {
147                     if (rs.getClientExpressionTree() != null
148                         && !rs.getClientExpressionTree().evaluate(rs.getHConnectionImpl(), result))
149                         continue;
150                 }
151                 catch (ResultMissingColumnException e) {
152                     continue;
153                 }
154                 catch (NullColumnValueException e) {
155                     continue;
156                 }
157 
158                 this.incrementReturnedRecordCount();
159 
160                 if (selectStatement.isAnAggregateQuery()) {
161                     this.getResultSet().getAggregateRecord().applyValues(result);
162                 }
163                 else {
164                     final T val = (T)resultAccessor.newObject(rs.getHConnectionImpl(),
165                                                               selectStatement.getMappingContext(),
166                                                               selectStatement.getSelectElementList(),
167                                                               rs.getMaxVersions(),
168                                                               result);
169 
170                     return rs.getQuery().callOnEachRow(val);
171                 }
172             }
173 
174             this.setCurrentResultIterator(null);
175             this.getResultSet().closeResultScanner(this.getResultSet().getCurrentResultScanner(), true);
176         }
177 
178         if (this.getResultSet().getSelectStmt().isAnAggregateQuery()
179             && this.getResultSet().getAggregateRecord() != null) {
180 
181             // Stash the value and then null it out for next time through
182             final AggregateRecord retval = this.getResultSet().getAggregateRecord();
183             this.getResultSet().setAggregateRecord(null);
184 
185             return rs.getQuery().callOnEachRow((T)retval);
186         }
187 
188         this.markIteratorComplete();
189         return null;
190     }
191 }