This post was originally published by Divij Vaidya at Hacker Noon
In this article, we first explain the requirements for monitoring your big data analytics pipeline and then we go into the key aspects that you need to consider to build a system that provides holistic observability.
Note: This article assumes some basic familiarity with the big data landscape and associated technologies such as Hadoop, YARN, Spark, Presto etc.
In the technology fueled dynamic environment of the 21st century, for a business to be successful, it needs to learn from the past results, adapt to the current requirements and forecast the future trends. These valuable insights into the past, present and future are retrieved by analyzing vast amounts (large scale) of data collected over time.
Apart from these, the COVID pandemic has taught us that a successful business should be nimble and innovative. Nimbleness comes from the confidence to make the right decision at the right time. Innovation comes from increasing the likelihood of getting lucky by consistently performing experiments. Acquiring both these traits require us to analyze large scale data which could be processed quickly and economically. Today, a growing ecosystem of big data technologies such as Spark, Hive, Kafka, Presto etc. fulfill these requirements.
The internet is filled with articles about usage of data processing and analytics frameworks. But, there is a shortage of commentary about the auxiliary systems that augment the power of big data technologies. These systems are force multipliers for a better user experience in the big data ecosystem. They provide infrastructure cost attribution, data set discovery, data quality guarantees, audit user access and extract insights (aka gain observability) into your analytical applications which run on the big data stack (henceforth called big data applications).
This article covers some key ideas to gain observability into big data applications. We would define the scope of requirements, present some known challenges and discuss high level solutions.
Big data stack can be broadly divided into the following layers starting from the top of the hierarchy to the bottom: Query processing, execution framework, resource manager & storage. These independent sub-systems work together to execute an analytical query (big data application).
Layers in the full stack for a big data application — (Image source)
Data application observability is the ability to extract insights on the characteristics of big data applications. These insights lead to:
- lower infrastructure cost by improving resource utilization
- increased user productivity by ease of debugging and troubleshooting
- improved availability by reducing mean time to recovery and
- boosted security by providing auditing and accountability.
The user personas for the data application observability can be broadly bucketed into the categories described below. Each kind of persona has different requirement for data freshness, query latency, type and granularity of information.
End user is a person who runs queries on the data platform. They could be running an ad hoc query on data using technology such as Presto, Hive, Impala, Spark SQL etc. or could be building a Business Intelligence (BI) dashboard. The job family for this user could be data scientist, software engineer or analyst. This category of user requires metrics & data (henceforth referred as observability statistics) for troubleshooting failures or optimizing performance of their jobs. Some examples of questions that this user persona asks are:
User persona: “End user” writing hive queries to extract insights from data — (Image Source)
What is the execution status of my query?
How can I make my query run faster?
What is the most ideal time of the day to run my query based on the traffic in my team’s YARN queue?
How much does running this query cost my organization?
Is the behavior of this query different from the past? Why?
What is the internal execution plan for this query?
How can I debug and fix my failed query?
Is the allocated capacity to this query sufficient enough to satisfy the query SLA?
Where can I find the results of a query that I ran in the past?
Local Resource Administrator
Local Resource Administrator is the person who is the administrator of infrastructure resources at org/team/project level. In a large organization, it is important to perform cost attribution to various projects/teams so that the return of investment (ROI) could be analyzed. Local resource administrators ensure that the ROI is favorable for their org/team/project.
This user persona requires observability statistics for planning infrastructure capacity and maintaining cost efficiency for their organization. They are interested in finding answers to following types of questions:
User persona: “Local Resource Administrator” monitoring the usage of infrastructure across organization — (Image source)
What is the utilization of the allocation YARN queue for my organization?
Who (user) is running the most expensive queries on a YARN queue?
How much is the total cost to my org for usage of the big data infrastructure?
Which app is creating a large number of small files in HDFS?
How much capacity should we reserve for this new project?
What is the right amount of capacity reservation to meet the SLA?
How many applications have failed in the past month because of resource limitations?
I am running out of space in HDFS. Do I have any unused data that I can delete?
What are the top 10 applications consuming the most CPU/memory in my YARN queue?
What are all the applications running in the YARN queue?
How many queries did this user run in the last 24 hours?
Which are the most accessed tables from this YARN queue?
What are the applications waiting for resource allocation in this YARN queue?
What are the queries running in this queue sorted by longest time?
User persona: “Oncall operator” fixing high priority critical incidents – (Image Source)
An oncall operator is responsible for responding to critical incidents which might impact the availability or performance or reliability of their service. Their objective is to mitigate the problem as early as possible (reduce MTTR) with minimal impact to the end users.
To effectively troubleshoot and debug the incident, oncall requires visibility into the current state of the systems at a service level granularity, cluster level granularity and the ability to drill into a query level granularity. They are interested in anomaly metrics to detect the potential cause of inconsistency in the system. Some examples of questions that they ask are:
The system is running out of memory. What is the rogue query that is consuming the majority of memory?
This application didn’t make any progress in the last one hour. Where is it stuck?
What is the SLA miss rate per Hadoop cluster?
How many applications are failing per Hadoop cluster?
What are all the queries running in this cluster sorted by longest time?
What are all the queries running in this cluster?
Big data applications are complex. A typical workload goes through multiple distributed systems across the depth of the stack. These systems coordinate together for processing a single big data workload. Due to the involvement of numerous complex systems, operational use cases such as troubleshooting failures, identifying performance bottlenecks, finding security holes etc. become hard problems to solve.
Based on the requirements of different kinds of user persona, the requirement for observability of big data applications can be classified into different use cases. To illustrate the complexity associated with the use cases, we would use an example of big data workload.
Consider a spark application which is created from a Jupyter notebook by a data scientist. The application reads data from two different Hive tables, performs some business logic and then writes the result to a CSV file. The big data stack involved in processing the application is Jupyter Notebook -> PySpark -> Apache Livy -> Spark -> YARN -> HDFS.
Application Failure Troubleshooting
In the example application failure might occur at any layer. Let’s assume that due to incorrect permissions on the HDFS folder containing dependencies for the Spark application, Livy is not able to download (read) the files configured via — py-files of spark-submit. This leads to an application failure. To begin investigating the root cause of the failure, the user would require visibility into the application characteristics such as:
full software stack for the application so that they can understand that there is a service like Livy which comes into the picture somewhere in the middle of the application execution stack, application logs at each layer so that they can determine that error occurred at Livy parameter that were passed to Livy so that user can determine what files was Livy trying to access from HDFS
This example illustrates the difficulty experienced by a user when they troubleshoot a failure in the big data application.
Users rely on the observability insights to dive deeper into the characteristics of their applications and analyze the causes for failures. These insights help them understand the complete picture of what is happening with the workload from the top level viewpoint of a workload.
Application Performance Optimization (Cost Efficiency)
In the example application, let’s assume a scenario when the YARN queue assigned to the spark application is overloaded and as a result, the application is queued for a long time before it starts to execute. From our end user’s perspective the overall application is running slower than usual. To begin troubleshooting the performance bottleneck, user would require visibility into the application characteristics such as:
- full software stack for the application so that they can understand that the execution is stuck at YARN layer
- execution breakdown at each layer of the stack so that they can understand that upstream layers from YARN processed the application in usual time
- local execution status at each layer so that they can understand that the application is in queued state at YARNresource
- utilization at each layer so that they can understand that the application is asking for 1000 executors from YARN which are not currently available in the queue
Additionally the user would need metadata such as which YARN queue was assigned to the application, what were the configuration parameters passed to Spark etc. Although this hypothetical scenario exhibits a simple performance bottleneck, it illustrates the kind of granular observability required to troubleshoot performance in a big data application.
Users rely on the observability insights to dive deeper into the characteristics of their applications to find sub optimal configurations or performance bottlenecks. Optimizing these bottlenecks result in faster applications which in turn improves the cost efficiency of the underlying hardware resources.
In the example application, for any troubleshooting or monitoring, the user has to correlate the notebook identifier with the Livy session ID which should be mapped to the spark application ID which in turn should be mapped to YARN application ID.
Users of other auxiliary big data systems such as such as user access auditing or data lineage rely on the observability insights to determine the application lineage across the depth of the big data stack.
As an example, to find the users which have accessed a particular Hive table, mapping is required between the HDFS NameNode API calls to the Spark Application ID.
Fair allocation of resources amongst big data application of different priority is achieved through abstractions such as YARN queues or Hadoop clusters.
Observability into the utilization of YARN queues and Hadoop clusters is essential to ensure a fair distribution of resources across users.
It can prevent scenarios such as critical jobs suffering from resource starvation, resource hogging by a single user, runaway query consuming majority of resources etc.
Collection and publishing of metrics is not a standard feature in big data systems. The degree of support for collecting and emitting metrics varies across a spectrum. There are systems which collect the metrics and expose an API to expose them such as YARN, systems which provide hooks into the execution lifecycle of a query such as Hive & Spark and systems like HDFS that do not provide any significant monitoring feature out of the box. Depending on the set of systems used in your organization, getting end to end holistic observability into an application might require devising custom solutions for collecting and publishing the metrics. Writing these custom solutions requires an in-depth understanding of the otherwise opaque (it-just-works) open source system since an inefficient metric collection code can significantly hamper the execution performance of the system.
Processing the metrics collected from these heterogeneous systems becomes complex due to lack of a common vocabulary to define the metrics. Each system across the depth of the stack emits the metrics as loosely typed strings or integers. Extensive raw data processing and normalization is required to provide holistic cross stack visibility to the users.
The scale of the metrics growth is not linear to the growth in the number of jobs. Each job emits various kinds of metrics which can fan out the scale to an exponential growth trajectory. The observability system should be able to handle the scale with realtime latencies.
Depending on the open source platform, the metrics retention on the server varies from a few minutes to a few hours. If the metrics are not collected within this duration, they are lost forever. The observability system should be highly available and robust in its collection mechanism as an availability drop of a few minutes could lead to permanent data loss.
Traditional observability systems designed for web services do not satisfy the requirements of providing observability for big data applications. Cardinality of metrics, kinds of metrics and the type of signals other than metrics for a big data application varies as execution moves from one sub-system to another. Traditional observability systems support limited cardinality (distinct set of values) and limited max number of dimensions for a metric. In case of big data, as an example Spark alone has more than 350 configuration parameters which could be used as dimensions. An ideal big data application observability system would be able to correlate the metrics emitted across multiple stacks to provide a unified view of the execution model for the application.
Lastly, tracing the application across the stack layers is a challenge due to the lack of universal tracing amongst open source applications. Each layer in the stack creates its own unique ID and it is left to an external system to map the IDs with each other. As an example, a hive query might have an ID such as hive_123 while the YARN application ID would be application_234 and Livy batch ID might be batch_789. But all thsse IDs refer to the same hive query which uses Apache Livy & Spark behind the scenes for execution.
There are some recent efforts to integrate Jaeger with Hadoop in the open source world but the universal tracing hasn’t covered the entire depth of the stack yet.
There are three milestones in the journey towards providing observability for data applications. Each milestone is a necessary building block for the next and provides incremental value to the user.
Milestones towards holistic Data Application Observability — (Image created by author)
Milestone 1: Visibility into application characteristics
At the completion of this milestone, users would get granular visibility into the raw metrics associated with the lifecycle of an application. The task of interpreting the metrics and extracting useful insights is left up to the user.
From a technical standpoint, this milestone creates a number of building block components of the entire observability system. This article would explore three high level components without going into technical implementation details (currently, there is no complete open source implementation which could be cited as an example here).
The user facing component should allow users to consume information both visually (UI) and programmatically(APIs). An intuitive user interface ties together different metrics collected from across the stack to provide holistic observability into the application. A user of the interface might not necessarily know under the hood details of the application and thus, the user experience should cater to users of different technical acumen. As an example, a marketing executive running a Hive SQL query to get results for a campaign does know that the Hive query was executed on a YARN cluster using the Spark processing engine. They are only interested in checking whether their query has completed from the interface.
On the other end of the spectrum, the engineer from the Hive team would be interested in exploring the hive query execution plan or looking at the metrics of YARN containers when the query was being executed. Some open source systems ship with their own in-built UI such as Spark Web UI or Hadoop Web UI or Tez UI. These interfaces are cater to the users who have in-depth knowledge of these systems.
I do not recommend taking user experience inspiration from these UIs for observability systems which cater to the user personas discussed earlier in the article.
Open source Spark web UI — Image source
Collectors perform realtime collections of metrics across different layers of stack. They guarantee ordering of lifecycle events and at-least-once collection of events. We have discussed the challenges associated with collectors earlier in the article. Custom implementation is required to leverage hive hook or spark hook which could provide basic lifecycle metadata for an application, you could write custom periodic collectors to collect information from YARN APIs, devise custom agents to parse and publish the HDFS audit logs and writing custom agents to collect the extract CPU/Memory utilization for spark executors.
Although multiple software as a service (SaaS) providers provide functionality described here, in the open source domain, very limited support is available for these collectors which could be leveraged as example implementations such as Apache Atlas for hive hooks, Apache Ambari for basic cluster monitoring or Uber JVM profiler for resource consumptions by spark executors.
Milestone 1: Components of Data Application Observability system — (Image created by author)
Pipelines for metrics enrichment bridge the gap between the user interface and the metric collectors. This component ingests the metrics published by the collectors, processes the metrics and stores them so that the user interfaces could read them. This component should be able to handle the scale of metrics being ingested into the system and should have the ability to process each metric without impacting the metric propagation time to the end user.
The requirement for metric propagation time to end user aka metric freshness varies from use case to use case. For some use cases such as alerting when an application is consuming 90% of YARN queue resources, the metric freshness should be in the order of seconds while use cases such as performance optimization could be solved by a metric freshness in the order of minutes.
The processing phase of the metrics involves cleaning the metrics, normalizing the metrics into a standard vocabulary and enriching the data by gathering auxiliary information from alternate sources. Some examples of processing & enrichment tasks are:
- uploading the query plan size of 1MB to a blob store instead of passing it on the wire across the pipelines
- enriching the HDFS audit logs by retrieving the mapping between hive table and HDFS files from the Hive MetaStore (HMS)
- joining the information from different metrics streams to create an application lineage; calculate cost of a job based on utilization metrics.
Milestone 2: Actionable insights
At this stage, the system would provide users with insights derived from the raw data. This would save the user effort to sift through the plethora of raw metrics to find the significant metrics. The system is able to suggest actions for the user because it understands the underlying correlation of the metrics.
In the previous milestone, the intuitive user interface displays complete end to end observability to the user. At this milestone this interface changes from a research driven interface to a workflow based interface. A research driven interface would display all the possible metrics to the user and let the user interpret them while the workflow based interface would enter the user into a funnel related to an anomaly, at the end which the anomaly would have been analyzed and if required, fixed.
Milestone 2: Components of Data Application Observability system — (Image created by author)
A new “insight extraction” component is added to the metric processing pipelines which is triggered on receiving critical events such as lifecycle state change. This pipeline runs a set of rules to determine if a metric is anomalous or not. These rules could either be static or artificially generated using a Machine Learning model.
The full implementation for such a complex component is out of the scope for this article but to demonstrate a proof of concept, let us revisit the example we discussed earlier in the article (Use Cases section) where an application might run slower than usual or time out due to YARN queue overload. On receiving periodic events associated with queued time for an application, this new component would compare it against the average queue time for an application in the YARN queue which is derived based on historical information. If the current queue time is higher, this system could retrieve the current YARN queue utilization to derive the top 5 longest running applications on the queue and top 5 applications consuming the most resources. Based on this data, the user could be entered into a workflow in the interface where they would be given options to either kill one of the top 5 apps in the queue or schedule their job to run at a different time suggested by the component based on historical queue usage pattern.
Milestone 3: Self healing
At this stage, the system would perform the actions that it has derived in Milestone#2 instead of relying on the end user to manually perform the actions. The results of the analysis performed by the “insight extraction” component acts as a feedback loop to the big data frameworks. Based on the feedback, these frameworks automatically tune the parameters for application.
Milestone 3: Components of Data Application Observability system — (Image created by author)
To illustrate the functionality let us consider an example where you run a spark job and it runs slowly. Achievements in milestone#1 provides you with the information about the execution timeline of the spark job. The systems developed in milestone#2 will highlight the longer than usual execution time. Specifically, it suggests that the majority of execution time is spent on writing results to HDFS. To remedy this, system would also suggest that you set the repartition flag to 1. During this milestone, the system would provide feedback to the spark execution engine so that future runs of this application will be executed with the repartition flag set to 1. This leads to self healing for the application without any user intervention.
With the data processing systems becoming critical to modern online businesses, understanding the characteristics of big data applications is crucial to maintain a robust, efficient, secure, easy to use and performant data analytics platform. Hopefully this article has provided you with a glimpse into the opportunities that observability into your data applications could unlock and guided you towards a path in the journey to achieve the desired objective.
Disclaimer: All opinions and thoughts expressed in the article are my own and do not reflect the opinions of my past or present employer.
This post was originally published by Divij Vaidya at Hacker Noon