The pipeline manager is in charge of the loading of the pipeline configuration and the initialization of its implementation, it will also check for updates to the pipeline configuration and replace the current implementation with a new one.
Pipeline manager is a singleton instance accesible from anywhere in the code, this will give access to all pipeline loaded on it.
You can get the pipeline manager in you code using this code
Code Block | ||||
---|---|---|---|---|
| ||||
from app.pipeline import pipeline_manager |
Pipeline
A pipeline represents a process divided in one or more steps (Stages), each step is specialised on a single action, being this data manipulation, database query, third party request,... The pipeline will initially get the data from the body request, and send it to the first stage as intermediate data, it is also the pipeline's job to generate and update the final data, with the responses of each stage.
Besides managing the data, the pipeline also manages the flow, and the process executing, being these Process and Post_process (there is a third flow called get_ui_config, but is still on development design). For each data flow the pipeline will execute the stages one at the time (with the exception of the stages inside a parallel stage, but it is actually the parallel stage executing stages in parallel), and in the order they were defined.
The normal execution of a pipeline involves executing the Process flow first and after every stage has executed, Post_process is executed. At the end of the Post_process, the final response is converted into a JSON and sent in the HTTP response.
The pipeline configuration looks like this
Property | Type | Description |
---|---|---|
enable | boolean | Enable pipeline configuration for execution (Default: true) |
stages | array | List of stages to execute (Minimum stages: 1) |
If you need to use a pipeline in code, you can use the pipeline manger to get the desired pipeline and execute the pipeline
Code Block | ||||
---|---|---|---|---|
| ||||
pipeline: PipelineImpl = pipeline_manager.get_pipeline('search') result = await pipeline.execute_pipeline(req, props=payload.dict(exclude_none=True)) |
Warning |
---|
A Pipeline always needs the request |
A Stage is a module specialize on a single action, using the available data provided in either intermediate or final. The range of actions is only limited to your imagination, a stage can either transform data, call a data base and retrieve new data, call a third party service and wait for its response, generate files, ... But we recommend to keep it as focus as posible, the intention of this is to be able to restructure a pipeline if need it, and to reuse as many stages as possible.
The configuration of a stage may vary between one and another but all share a small set of parameters:
Property | Type | Description |
---|---|---|
type | string | Stage class name (Only required when using the JSON format) |
enable | boolean | Enable stage for execution (Default: true) |
name | string | Name for this specific stage. Used on the intermediate and final parameters. |
save_to_intermediate | boolean | If true, the result of the stage will be stored in the intermediate instead of the final section. This will make the response of the stage unavailable for the final result. (Default: false) |
expand_result | boolean | Indicates if the result of this stage should be expanded into the final data dictionary instead of appended as the standard. (Default: false) |
ui_only | object | Section specific for UI configuration. This configuration will be retrieved when necessary and should affect the process of the stage. |
halt_on_exception | boolean | Indicates if, in case of an exception, the pipeline should be interrupted. (Default: false) |
Info |
---|
To learn more about stages go to How to Create a Stage |
Before Working with a pipeline please take into account the following suggestions:
All Pipeline:
Intended for automatic generation, web UI, and Automation NOT for manual pipeline building
Warning |
---|
Don't use this format |
Intended manual usage in IDE, all settings should have a class for either Dictionary or Pipeline format, so why not use the configuration class directly
No difference with previuos one, regarding functionality, but much more manageable when defining, and modifying
Panel |
---|
Pros
Cons
|
Panel |
---|
Pros
Cons
|
Panel |
---|
Pros
Cons
|
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
from utils.constants import DEFAULT_ENGINE_NAME PIPELINE = { 'enable': True, 'stages': [ { "type": "HighlightStage", "enable": True, "name": "highlight", "save_to_intermediate": False, "expand_result": False, "ui_only": None, 'engine_name': DEFAULT_ENGINE_NAME, "pre_tags": [ "<strong>" ], "pos_tags": [ "</strong>" ], "tags_schema": None, "force_source": False, "require_field_match": True, "number_of_fragments": 100, "fragment_size": 100, "order": None, "fields": [ { "field": "title", "fragment_size": None, "number_of_fragments": 1, "highlight_query": None, "type": None, "force_source": False }, { "field": "overview", "fragment_size": None, "number_of_fragments": 5, "highlight_query": None, "type": None, "force_source": False }, { "field": "url", "fragment_size": None, "number_of_fragments": 1, "highlight_query": None, "type": None, "force_source": False } ] }, { "aggs": [ { "type": "term", "field": "metadata.genres.name.keyword", "negated": False, "must": True, "id": "genres", "ui_only": { "display_name": "Genres", "hide_count": False, "hide_if_one": False, "translate": None, "max_display": 5, "mime_icon": False, "expanded": True, "enable_negative": True }, "aggs": [ { "id": "top_hits", "type": "top_hits" }, { "type": "term", "field": "metadata.spoken_languages.name.keyword", "negated": False, "must": True, "id": "spoken_languages", "ui_only": { "display_name": "Spoken Languages", "hide_count": False, "hide_if_one": False, "translate": None, "max_display": 5, "mime_icon": False, "expanded": False, "enable_negative": False }, "aggs": None, "max_values": None, "multi_select": False, "operation": "or", "order": None, "exclude_terms": None, "include_terms": None }], "max_values": 100, "multi_select": True, "operation": "or", "order": None, "exclude_terms": None, "include_terms": None }, { "type": "bool", "field": "adult", "negated": False, "must": True, "id": "adult", "ui_only": { "display_name": "Adult", "hide_count": False, "hide_if_one": False, "translate": None, "display_True": "True", "display_False": "False" }, "aggs": None }, { "type": "date_histogram", "field": "release_date", "negated": False, "must": True, "id": "release_date_h", "ui_only": { "display_name": "Release Date", "hide_if_one": False, "hide_count": False, "translate": None, "level": 0 }, "aggs": None, "calendar_interval": ["1y", "1q", "1M", "1d"], "format": ["yyyy", "'Q'q", "MMMM", "E dd"], "time_zone": ["-06:00"], "min_doc_count": [100, 1], "offset": None, "order": None }, { "type": "histogram", "field": "metadata.budget", "negated": False, "must": True, "id": "budget", "ui_only": { "display_name": "Budget", "hide_if_one": False, "hide_count": False, "translate": None, "level": 0 }, "aggs": None, "interval": [10000000, 1000000], "min_doc_count": [100, 10, 1], "order": None, "offset": None }, { "type": "slider", "field": "metadata.budget", "negated": False, "must": True, "id": "budget_slider", "ui_only": { "display_name": "Budget", "hide_count": False, "hide_if_one": False, "translate": None, "min": None, "max": None, "show_ticks": True }, "aggs": None }, { "type": "date_range", "field": "release_date", "negated": False, "must": True, "id": "release_date_dr", "ui_only": { "display_name": "Release Range", "hide_count": False, "hide_if_one": False, "translate": None }, "aggs": None, "format": "MM-yyyy", "ranges": [ { "key": "minus_10", "start": "now-10y", "start_display": None, "end": None, "end_display": None }, { "key": None, "start": "01-1960", "start_display": None, "end": "01-1970", "end_display": None }, { "key": None, "start": None, "start_display": None, "end": "now-10M/M", "end_display": None } ] }, { "type": "range", "field": "metadata.budget", "negated": False, "must": True, "id": "budget_r", "ui_only": { "display_name": "Budget Range", "hide_count": False, "hide_if_one": False, "translate": None }, "aggs": None, "ranges": [ { "key": "< 10,000,000", "start": None, "end": "10000000" }, { "key": "> 10,000,000", "start": "10000000", "end": None }, { "key": "50 to 100 millions", "start": "50000000", "end": "100000000" } ], "multi_select": False } ], "type": "DynamicAggStage", "enable": True, "name": "aggregations", "save_to_intermediate": False, "expand_result": False, "ui_only": None, "filters_name": "aggregations_filters", 'engine_name': DEFAULT_ENGINE_NAME, "search_response": "search", "ui_aggregations": True }, { "type": "FilterStage", "enable": True, "name": "filters", "engine_name": DEFAULT_ENGINE_NAME, "save_to_intermediate": False, "expand_result": False, "filters": [ { "type": "term", "field": "adult", "negated": False, "must": True, "value": False, "multi_select": False } ] }, { 'type': 'QueryStage', 'engine_name': DEFAULT_ENGINE_NAME, 'enable': True, 'qpl_enable': True, 'name': 'search', 'index': 'movies', 'fields': ['title', 'overview', 'url', 'status', 'metadata', 'metadata.production_companies', 'metadata.cast', 'metadata.directors'], 'range_fields': ['metadata.release_date'], 'fetch_fields': ['title', 'adult', 'overview', 'url', 'release_date', 'status', 'video', 'metadata', 'metadata.production_companies', 'metadata.cast', 'metadata.directors'], 'implicit_operator': 'or', 'aggregations': 'aggregations', 'aggregations_filters': 'aggregations_filters', 'ui_only': { 'sort': { 'default': { 'field': '_score', 'display_name': 'Score', 'order': 'desc' }, 'options': [ { 'field': '_score', 'display_name': 'Score', 'order': 'desc' }, { 'field': 'release_date', 'display_name': 'Release Date', 'order': 'asc' }, { 'field': 'metadata.budget', 'display_name': 'Budget', 'order': 'desc' } ] }, 'page_size': { 'default': 25, 'options': [25, 50, 100] } }, 'sort': { 'field': '_score', 'order': 'desc' }, 'page_size_default': 25, 'highlight_stage': 'highlight' }, { 'type': 'DynamicResultsStage', 'engine_name': DEFAULT_ENGINE_NAME, 'enable': True, 'name': 'hits', 'title': { 'type': 'url', 'field': 'title', 'link': 'url' }, 'sub_title': { 'type': 'text', 'field': 'url', }, 'thumbnail': None, 'non_displayed_fields': [], 'header': [], 'body': [ [{'type': 'bool', 'label': 'Adult', 'field': 'adult'}, {'type': 'number', 'label': 'Popularity', 'field': 'metadata.popularity', 'format': '1.2'} ], [{'type': 'text', 'field': 'overview'}], [{'type': 'text', 'label': 'Status', 'field': 'status'}, {'type': 'date', 'label': 'Release Date', 'field': 'release_date', 'format': '%Y-%m-%d'}], [{'type': 'thumbnails', 'field': 'metadata.cast[*].profile_path_w185', 'alt': 'metadata.cast[*].name', 'detail': 'metadata.cast[*].name', 'full_size': 'metadata.cast[*].profile_path' }] ], 'metadata': { 'General': { 'fields': [ { 'type': 'chip', 'label': 'Chip', 'field': 'metadata.genres.name', 'unique': True }, {'type': 'text', 'field': 'title', 'label': 'Title'} ], 'sort': 'asc' }, }, 'search_response': 'search' }, ] } |
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
from app.pipeline import Pipeline from app.pipeline.stages import HighlightStage, DynamicAggStage, FilterStage, \ DynamicResultsStage, QueryStage from app.pipeline.stages.query import QueryStageUIConfig from framework.aggregations import * from framework.fields import * from framework.filters import * from models.engines import HighlightField, SortConfig, PageSizeConfig, SortEntry from models.utils import SortOrder from utils.constants import DEFAULT_ENGINE_NAME SEARCH_STAGE_NAME = 'search' ######################################################################################################################## # PIPELINE STRUCTURE ######################################################################################################################## PIPELINE = Pipeline( enable=True, stages=[ HighlightStage( enable=True, name='highlight', engine_name=DEFAULT_ENGINE_NAME, pre_tags=['<strong>'], pos_tags=['</strong>'], tags_schema=None, force_source=False, require_field_match=True, number_of_fragments=3, fragment_size=100, order=None, fields=[ HighlightField(field='title', number_of_fragments=1), HighlightField(field='overview', number_of_fragments=5), HighlightField(field='url', number_of_fragments=1), ] ), FilterStage( enable=True, name='filters', engine_name=DEFAULT_ENGINE_NAME, save_to_intermediate=False, expand_result=False, filters=[ TermFilter(field='adult', negated=False, must=True, value=False, multi_select=False) ] ), DynamicAggStage( enable=True, name='aggregations', filters_name='aggregations_filters', save_to_intermediate=False, expand_result=False, engine_name=DEFAULT_ENGINE_NAME, search_response=SEARCH_STAGE_NAME, ui_aggregations=True, # This over here generates the aggregations for ui, if you don't want them, set it to False aggs=[ TermAgg( id='genres', field='metadata.genres.name.keyword', must=True, max_values=100, multi_select=True, operation='or', order=None, exclude_terms=None, include_terms=None, ui_only=TermAggUI(display_name='Genres', max_display=5, expanded=True), aggs=[ TopHitsAgg( id='top_hits', type='top_hits' ), TermAgg( field='metadata.spoken_languages.name.keyword', negated=False, must=True, id='spoken_languages', ui_only=TermAggUI(display_name='Spoken Languages', max_display=5, expanded=False), aggs=None, max_values=None, multi_select=False, operation='or', order=None, exclude_terms=None, include_terms=None )] ), BoolAgg( id='adult', field='adult', negated=False, must=True, ui_only=BoolAggUI(display_name='Adult', display_True='True', display_False='False') ), DateHistogramAgg( field='release_date', negated=False, must=True, id='release_date_h', ui_only=DateHistogramAggUI(display_name='Release Date'), calendar_interval=['1y', '1q', '1M', '1d'], format=['yyyy', '\'Q\'q', 'MMMM', 'E dd'], time_zone=['-06:00'], min_doc_count=[100, 1], offset=None, order=None ), HistogramAgg( id='budget', field='metadata.budget', negated=False, must=True, interval=[10000000, 1000000], min_doc_count=[100, 10, 1], order=None, offset=None, ui_only=HistogramAggUI(display_name='Budget') ), SliderAgg( id='budget_slider', field='metadata.budget', negated=False, must=True, ui_only=SliderAggUI(display_name='Budget', translate=None, min=None, max=None, show_ticks=True), ), DateRangeAgg( field='release_date', negated=False, must=True, id='release_date_dr', ui_only=DateRangeAggUI( display_name='Release Range', hide_count=False, translate=None ), format='MM-yyyy', ranges=[ DateRange( key='minus_10', start='now-10y', start_display=None, end=None, end_display=None ), DateRange( key=None, start='01-1960', start_display=None, end='01-1970', end_display=None ), DateRange( key=None, start=None, start_display=None, end='now-10M/M', end_display=None ) ] ), RangeAgg( field='metadata.budget', negated=False, must=True, id='budget_r', multi_select=False, ui_only=RangeAggUI( display_name='Budget Range', hide_count=False, translate=None ), ranges=[ Range( key='< 10,000,000', start=None, end='10000000' ), Range( key='> 10,000,000', start='10000000', end=None ), Range( key='50 to 100 millions', start='50000000', end='100000000' ) ], ) ] ), DynamicResultsStage( engine_name=DEFAULT_ENGINE_NAME, enable=True, name='hits', title=UrlField(field='title', link='url'), sub_title=TextField(field='url'), thumbnail=None, non_displayed_fields=[], body=[ [BoolField(label='Adult', field='adult'), NumberField(label='Popularity', field='metadata.popularity', format='1.2')], [TextField(field='overview')], [TextField(label='Status', field='status'), DateField(label='Release Date', field='release_date', format='%Y-%m-%d')], [ThumbnailsField(field='metadata.cast[*].profile_path_w185', alt='metadata.cast[*].name', detail='metadata.cast[*].name', full_size='metadata.cast[*].profile_path')] ], search_response=SEARCH_STAGE_NAME ), QueryStage( engine_name=DEFAULT_ENGINE_NAME, enable=True, qpl_enable=True, name=SEARCH_STAGE_NAME, index='movies', wildcard=False, fields=['title', 'overview', 'url', 'status', 'metadata', 'metadata.production_companies', 'metadata.cast', 'metadata.directors'], range_fields=['metadata.budget'], date_fields=['release_date'], fetch_fields=['title', 'adult', 'overview', 'url', 'release_date', 'status', 'video', 'metadata', 'metadata.production_companies', 'metadata.cast', 'metadata.directors'], exclude_fields=None, implicit_operator='or', page_size_default=25, sort=SortEntry( field='_score', order=SortOrder.DESC ), aggregations='aggregations', aggregations_filters='aggregations_filters', highlight='highlight_stage', filters='filters', ui_only=QueryStageUIConfig( sort=SortConfig( default=SortEntry( field='_score', display_name='Score', order=SortOrder.DESC ), options=[ SortEntry( field='_score', display_name='Score', order=SortOrder.DESC ), SortEntry( field='release_date', display_name='Release Date', order=SortOrder.ASC ), SortEntry( field='metadata.budget', display_name='Budget', order=SortOrder.DESC ) ] ), page_size=PageSizeConfig( default=25, options=[25, 50, 100] ) ) ) ] ) |
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
from app.pipeline import Pipeline from app.pipeline.stages import HighlightStage, DynamicAggStage, FilterStage, \ DynamicResultsStage, QueryStage from app.pipeline.stages.query import QueryStageUIConfig from framework.aggregations import * from framework.fields import * from framework.filters import * from models.engines import HighlightField, SortConfig, PageSizeConfig, SortEntry from models.utils import SortOrder from utils.constants import DEFAULT_ENGINE_NAME ######################################################################################################################## # STAGES CONFIGURATION ######################################################################################################################## SEARCH_STAGE_NAME = 'search' ################# # HIGHLIGHT ################# _highlight_stage = HighlightStage( enable=True, name='highlight', engine_name=DEFAULT_ENGINE_NAME, pre_tags=['<strong>'], pos_tags=['</strong>'], tags_schema=None, force_source=False, require_field_match=True, number_of_fragments=3, fragment_size=100, order=None, fields=[ HighlightField(field='title', number_of_fragments=1), HighlightField(field='overview', number_of_fragments=5), HighlightField(field='url', number_of_fragments=1), ] ) ################# # AGGREGATIONS ################# _aggregation_stage = DynamicAggStage( enable=True, name='aggregations', filters_name='aggregations_filters', save_to_intermediate=False, expand_result=False, engine_name=DEFAULT_ENGINE_NAME, search_response=SEARCH_STAGE_NAME, ui_aggregations=True, # This over here generates the aggregations for ui, if you don't want them, set it to False aggs=[ TermAgg( id='genres', field='metadata.genres.name.keyword', must=True, max_values=100, multi_select=True, operation='or', order=None, exclude_terms=None, include_terms=None, ui_only=TermAggUI(display_name='Genres', max_display=5, expanded=True), aggs=[ TopHitsAgg( id='top_hits', type='top_hits' ), TermAgg( field='metadata.spoken_languages.name.keyword', negated=False, must=True, id='spoken_languages', ui_only=TermAggUI(display_name='Spoken Languages', max_display=5, expanded=False), aggs=None, max_values=None, multi_select=False, operation='or', order=None, exclude_terms=None, include_terms=None )] ), BoolAgg( id='adult', field='adult', negated=False, must=True, ui_only=BoolAggUI(display_name='Adult', display_True='True', display_False='False') ), DateHistogramAgg( field='release_date', negated=False, must=True, id='release_date_h', ui_only=DateHistogramAggUI(display_name='Release Date'), calendar_interval=['1y', '1q', '1M', '1d'], format=['yyyy', '\'Q\'q', 'MMMM', 'E dd'], time_zone=['-06:00'], min_doc_count=[100, 1], offset=None, order=None ), HistogramAgg( id='budget', field='metadata.budget', negated=False, must=True, interval=[10000000, 1000000], min_doc_count=[100, 10, 1], order=None, offset=None, ui_only=HistogramAggUI(display_name='Budget') ), SliderAgg( id='budget_slider', field='metadata.budget', negated=False, must=True, ui_only=SliderAggUI(display_name='Budget', translate=None, min=None, max=None, show_ticks=True), ), DateRangeAgg( field='release_date', negated=False, must=True, id='release_date_dr', ui_only=DateRangeAggUI( display_name='Release Range', hide_count=False, translate=None ), format='MM-yyyy', ranges=[ DateRange( key='minus_10', start='now-10y', start_display=None, end=None, end_display=None ), DateRange( key=None, start='01-1960', start_display=None, end='01-1970', end_display=None ), DateRange( key=None, start=None, start_display=None, end='now-10M/M', end_display=None ) ] ), RangeAgg( field='metadata.budget', negated=False, must=True, id='budget_r', multi_select=False, ui_only=RangeAggUI( display_name='Budget Range', hide_count=False, translate=None ), ranges=[ Range(key='< 10,000,000', start=None, end='10000000'), Range(key='> 10,000,000', start='10000000', end=None), Range(key='50 to 100 millions', start='50000000', end='100000000') ], ) ] ) ################# # FILTER ################# _filter_stage = FilterStage( enable=True, name='filters', engine_name=DEFAULT_ENGINE_NAME, save_to_intermediate=False, expand_result=False, filters=[ TermFilter(field='adult', negated=False, must=True, value=False, multi_select=False) ] ) ################# # UI RESULTS ################# _dynamic_results = DynamicResultsStage( engine_name=DEFAULT_ENGINE_NAME, enable=True, name='hits', title=UrlField(field='title', link='url'), sub_title=TextField(field='url'), thumbnail=None, non_displayed_fields=[], body=[ [BoolField(label='Adult', field='adult'), NumberField(label='Popularity', field='metadata.popularity', format='1.2')], [TextField(field='overview')], [TextField(label='Status', field='status'), DateField(label='Release Date', field='release_date', format='%Y-%m-%d')], [ThumbnailsField(field='metadata.cast[*].profile_path_w185', alt='metadata.cast[*].name', detail='metadata.cast[*].name', full_size='metadata.cast[*].profile_path')] ], search_response=SEARCH_STAGE_NAME ) ################# # SEARCH ################# _query_stage = QueryStage( engine_name=DEFAULT_ENGINE_NAME, enable=True, qpl_enable=True, name=SEARCH_STAGE_NAME, index='movies', wildcard=False, fields=['title', 'overview', 'url', 'status', 'metadata', 'metadata.production_companies', 'metadata.cast', 'metadata.directors'], range_fields=['metadata.budget'], date_fields=['release_date'], fetch_fields=['title', 'adult', 'overview', 'url', 'release_date', 'status', 'video', 'metadata', 'metadata.production_companies', 'metadata.cast', 'metadata.directors'], exclude_fields=None, implicit_operator='or', page_size_default=25, sort=SortEntry( field='_score', order=SortOrder.DESC ), aggregations=_aggregation_stage.name, aggregations_filters=_aggregation_stage.filters_name, highlight=_highlight_stage.name, filters=_filter_stage.name, ui_only=QueryStageUIConfig( sort=SortConfig( default=SortEntry( field='_score', display_name='Score', order=SortOrder.DESC ), options=[ SortEntry( field='_score', display_name='Score', order=SortOrder.DESC ), SortEntry( field='release_date', display_name='Release Date', order=SortOrder.ASC ), SortEntry( field='metadata.budget', display_name='Budget', order=SortOrder.DESC ) ] ), page_size=PageSizeConfig( default=25, options=[25, 50, 100] ) ) ) ######################################################################################################################## # PIPELINE STRUCTURE ######################################################################################################################## PIPELINE = Pipeline( enable=True, stages=[ _highlight_stage, _filter_stage, _aggregation_stage, _dynamic_results, _query_stage ] ) |