Source code for mdf_toolbox.globus_search.sub_helpers
import warnings
from mdf_toolbox.globus_search.search_helper import SearchHelper, SEARCH_LIMIT
[docs]class AggregateHelper(SearchHelper):
"""Subclass to add the ``aggregate()`` functionality to the SearchHelper.
``aggregate()`` is currently the only way to retrieve more than 10,000 entries
from Globus Search, and requires a ``scroll_field`` index field.
"""
[docs] def __init__(self, *args, **kwargs):
"""Add the AggregateHelper to a SearchHelper.
Arguments:
scroll_field (str): The field on which to scroll. This should be a field
that counts/indexes the entries.
"""
self.scroll_field = kwargs.get("scroll_field", None)
super().__init__(*args, **kwargs)
def _aggregate(self, scroll_field, scroll_size=SEARCH_LIMIT):
"""Perform an advanced query, and return *all* matching results.
Will automatically perform multiple queries in order to retrieve all results.
Note: All ``aggregate`` queries run in advanced mode.
Arguments:
scroll_field (str): The field on which to scroll. This should be a field
that counts/indexes the entries.
scroll_size (int): Maximum number of records returned per query. Must be
between one and the ``SEARCH_LIMIT`` (inclusive).
**Default:** ``SEARCH_LIMIT``.
Returns:
list of dict: All matching entries.
"""
# Make sure scroll_field is valid
if not scroll_field:
raise AttributeError("scroll_field is required.")
# Make sure the query is set
if not self.initialized:
raise AttributeError('No query has been set.')
# Warn the user if we are changing the setting of advanced
if not self._SearchHelper__query["advanced"]:
warnings.warn('This query will be run in advanced mode.', RuntimeWarning)
self._SearchHelper__query["advanced"] = True
# Inform the user if they set an invalid value for the query size
if scroll_size <= 0:
raise AttributeError('Scroll size must greater than zero')
# Get the total number of records
total = self.search(limit=0, info=True, reset_query=False)[1]["total_query_matches"]
# If aggregate is unnecessary, use Search automatically instead
if total <= SEARCH_LIMIT:
return self.search(limit=SEARCH_LIMIT, reset_query=False)
# Scroll until all results are found
output = []
scroll_pos = 0
while len(output) < total:
# Scroll until the width is small enough to get all records
# `scroll_id`s are unique to each dataset. If multiple datasets
# match a certain query, the total number of matching records
# may exceed the maximum that search will return - even if the
# scroll width is much smaller than that maximum
scroll_width = scroll_size
while True:
query = "({q}) AND ({field}:>={start} AND {field}:<{end})".format(
q=self._SearchHelper__query["q"], field=scroll_field, start=scroll_pos,
end=scroll_pos+scroll_width)
results, info = self.search(q=query, advanced=True, info=True)
# Check to make sure that all the matching records were returned
if info["total_query_matches"] <= len(results):
break
# If not, reduce the scroll width
# new_width is proportional with the proportion of results returned
new_width = scroll_width * (len(results) // info["total_query_matches"])
# scroll_width should never be 0, and should only be 1 in rare circumstances
scroll_width = new_width if new_width > 1 else max(scroll_width//2, 1)
# Append the results to the output
output.extend(results)
scroll_pos += scroll_width
return output
[docs] def aggregate(self, q=None, scroll_size=SEARCH_LIMIT, reset_query=True, **kwargs):
"""Perform an advanced query, and return *all* matching results.
Will automatically perform multiple queries in order to retrieve all results.
Note:
All ``aggregate`` queries run in advanced mode, and ``info`` is not available.
Arguments:
q (str): The query to execute. **Default:** The current helper-formed query, if any.
There must be some query to execute.
scroll_size (int): Maximum number of records returned per query. Must be
between one and the ``SEARCH_LIMIT`` (inclusive).
**Default:** ``SEARCH_LIMIT``.
reset_query (bool): If ``True``, will destroy the current query after execution
and start a fresh one.
If ``False``, will keep the current query set.
**Default:** ``True``.
Keyword Arguments:
scroll_field (str): The field on which to scroll. This should be a field
that counts/indexes the entries.
This should be set in ``self.scroll_field``, but if your application
requires separate scroll fields for a single client,
it can be set in this way as well.
**Default**: ``self.scroll_field``.
Returns:
list of dict: All matching records.
"""
scroll_field = kwargs.get("scroll_field", self.scroll_field)
# If q not specified, use internal, helper-built query
if q is None:
res = self._aggregate(scroll_field=scroll_field, scroll_size=scroll_size)
if reset_query:
self.reset_query()
return res
# Otherwise, run an independent query as SearchHelper.search() does.
else:
return self.__class__(index=self.index, q=q, advanced=True,
search_client=self._SearchHelper__search_client
).aggregate(scroll_size=scroll_size, reset_query=reset_query)