Chief Architect: Distributed scheduling for big data

2020-05-13

Author: Liang Fukun

The distributed scheduling of big data plays an overall role in the process of data ETL. The production, delivery and consumption of the entire data will run through it. This article starts from the characteristics of scheduling and distributed scheduling, and then the personality of big data scheduling Some of the elaboration of the characteristics, from the requirements of the architecture and business scenarios to meet the use of big data, to share from a practical point of view how to create a highly available, efficient and flexible big data scheduling platform.

Dispatch


Since the 1950s, the study of scheduling problems has received scientific attention in the fields of mathematics, operations research, engineering technology, etc. [1], people mainly study scheduling problems from the perspective of mathematics, and scheduling problems are also defined as "" Allocate a set of resources to perform a set of tasks "to obtain the optimal execution time or cost of production tasks [2]. The implementation of scheduling tasks in the computer can rely on the timing tasks of the operating system to trigger (such as the Crontab of the Linux system), mainly for single-task mechanism triggers, the most basic need for scheduling can be triggered on time or according to events (At-least-once) If the task does not meet expectations, it needs to be retried on the application side. It is most likely to ensure that the task is executed on time and successfully executed, and it cannot be executed multiple times (Exactly once); but in business scenarios, repeatable execution and consistency can be guaranteed In operation, it is also indispensable to strive for normal execution and multiple executions. For example, for merchants to perform routine settlement before 1min, if the settlement is to find unsettled merchants according to the 30min time window, then a 30min delay will be tolerated. And multiple executions will not give merchants more settlement, because in the settlement payment and reset whether the settlement flag can be designed to be an atomic operation. Therefore, the scheduling can be executed on time and correctly, and there are some architectural choices in the design of the business side to ensure final consistency.

If there are upstream and downstream cooperation in the application scenario, or there will be different hosts to complete the task execution, or in order to ensure the task high availability scenario, it is necessary to introduce a distributed scheduling architecture.

Distributed scheduling


Distributed scheduling is developed on the basis of a single machine. Taking into account the high availability, high efficiency, and distributed collaboration, the gradually evolving scheduling method. From single point scheduling to distributed collaboration is a qualitative process. This process involves To many features that do not exist on a single machine, let's talk about the key points below:

Figure 1 Componentized exploded view of distributed scheduling

2.1 Scheduler decentralization & high availability

Involving the coordination of distributed scheduling, it is necessary to have a scheduling center node. At the same time, to ensure high availability, the scheduling center node needs to be released by multiple nodes, and the main and standby methods go to single point dependence.

2.2 Host selection

In the task execution stage of distributed scheduling, all executions can be performed in the target host, and N is selected as M (Ngt; = Mgt; = 1). The host has the same type of task mutual mechanism. In MPP (Massively Parallel Processor) It is especially common in architectures, which divide and conquer large tasks and complete them quickly. There are also scenarios (such as takeout to merchants for settlement) that can only be executed by one host for consistency and accuracy, and need to be successfully executed.

Passive selection strategy: The host's passive selection mechanism can generally choose a strategy randomly or in sequence, or it can perform conventional scheduling and allocation according to the number of tasks performed by the current host. Of course, you can also perform advanced operations, and refer to the host's processing capacity (throughput and response time), resource usage (CPU, Memory, Disk I / O, Net I / O, etc.) for dynamic allocation of the feedback mechanism. The latter needs to have a centralized node to store the processing capacity and resource situation of the current host, so as to provide a reference in decision-making.

Active selection strategy: The host's active selection has a richer election strategy. When the task reaches a specific operator, it will more clearly define how many hosts need to participate in the current task, and use zookeeper's distributed lock to achieve lock preemption The mechanism is implemented if the preemption is successful, otherwise it is abandoned. This election strategy allows the host to get more participation and reduces its dependence on the scheduler. This way of active selection avoids passive selection being selected because it does not have execution conditions, and the loss of execution ability in time.

2.3 Task failover

From the task level job to the transformer and operator of the scheduling task, there are specific local failures in the entire chain. The scheduler needs to transfer to the other standby host function after the original target host retry and fails to ensure that the task is successful. carried out.

2.4 Executive operator abstraction

In the past, the scheduling of stand-alone tasks can perform a variety of tasks more flexibly, such as scripts, Webservice calls, HDFS Client command lines, etc., but for distributed collaboration, it needs to receive external commands to run, which requires the operator to use standard data communication protocols Provides calling services. Conventional WebService, RPC (thrift / protocol buffer) and other protocols have relatively wide applications in cross-language communication. Therefore, the specific execution unit can be an abstraction of a specific task. For example, the Rest API method is provided. The URL and parameters of the call are filled in by the executor, which maximizes the flexibility; the database operation operator can include database verification information and specific execution. SQL etc. After performing the operator abstraction, it meets the specifications and flexibility. Flexibility is a double-edged sword, which can meet the needs of users to the greatest extent, but it will also lead to the inability of the big data layer to perceive the completion of the table and field data , Data output cannot be delivered with more fine-grained granularity.

2.5 Flexible expansion

The specific execution host of the task needs to meet the flexible expansion at the scheduling level. The main need for expansion is to meet the high availability and the task sharing pressure along with the horizontal expansion. When selecting a cluster target host, the general target set can specify a specific IP-List, or it can be a BNS (Baidu Machine's NameServer service). The setting of the IP-List method is relatively simple and intuitive, but there are cases where each adjustment relies on the change scheduling system service and the host needs to be refreshed after the change. The BNS service is relatively simple, and it is combined with the online service release and deployment. There is no delayed deployment and refresh. It is recommended to intervene through BNS.

2.6 Trigger mechanism

Conventional triggering is done according to the Crontab syntax of the execution interval or specific time. The start time and deadline parameters are completed. However, in distributed scheduling tasks, the most important thing is to complete the collaboration, so if you want to advance, you rely on the trigger mechanism. This is a good way to form upstream and downstream dependency triggers and is a key step in distributed collaboration. The initial task node is triggered according to the routine, and the downstream node forms a dependency chain. Here, if it is advanced, it is triggered by a certain frequency / certain frequencies, such as being executed at 12 minutes per hour. The downstream can choose a specific 2 : 12, 4:12 trigger, instead of being called every 12 minutes. These three methods currently have different scenarios in big food delivery platforms, and the architecture design has flexible delivery on three requirements.

2.7 Clogging mechanism

For the running instances of the same task at different times, there will be cases where the previous instance has not ended normally. This will occur in the case of high frequency calls and third-party dependent failure delays. If the call continues, it will cause the call chain to deteriorate. So to prevent this situation, the congestion mechanism will provide three modes: regular routine (default mode), discard the follow-up, and discard the precedent. Both of the latter two solutions need to provide a fault-tolerant replay mechanism. This scenario is similar to the settlement case mentioned in section 1.1.

2.8 Graphical progress review

Scheduling can be based on the calling chain and instances of different event frequencies, and the execution progress can be viewed in a tree-like graphical manner, for example, the status, status and specific real-time execution logs of the transformer and operator in the job can be viewed. Graphic is a chain analyzed according to the trigger mechanism of the call. It is a way to visually express the clear context in the cumbersome and complicated call relationship. It is a conventional display method in scheduling. In the advanced stage, you can view the corresponding parameter transfer, the execution progress bar of the concurrent operator, and the estimated completion cycle.

2.9 Alarm

By mail or text message, the unsatisfactory return mark is suspended, and at the same time, a warning is sent to the preset user or user group by mail or text message. The alarm trigger mechanism can be triggered when the host machine is single, or it can trigger an alarm when a certain proportion of the host machine exceeds the threshold within a certain time window. At the same time, it is also necessary to support the shielding of alarms, which is used in the case of operation and maintenance or upgrade deployment, operation and maintenance.

The above are some of the features that many conventional scheduling have. These are extended requirements in a distributed scenario. From a single point of simple logic to multi-node coordination and coordination, there is no doubt that additional assistance is added at the engineering level. These are gradually in the process of business evolution. To improve, high availability and high efficiency are the changes made in a distributed environment.

Big data distributed scheduling

Big data distributed scheduling, on the basis of the above general scheduling, has been specifically improved to match the characteristics of the data. It is mainly combed from the data process level to explain the problems of upstream and downstream data and blood relationship. What are the specific characteristics of big data?

3.1 Data fan-in fan-out

There are many big data storage and retrieval solutions. One of the characteristics of big data is diversity. In order to meet diverse business scenarios, there will be different engines or storage options. While diversifying solutions, it causes the exchange of data to change. Complicated, the data access rules between the engines have personalized support, such as Hbase data to Mysql and ElasticSearch (hereinafter referred to as ES), which involves Hbase reading and subsequent data storage of the latter two. For Hbase, it is a one-to-two data fan-out, but after the data is obtained by Get or Scan in Hbase, you need to understand the storage structure of the latter two or even the index structure to insert the data. Therefore, similar to this cross-engine (or cross-version, different API) approach, in order to maintain generality, it is necessary to abstract the requirements. A set of open SQL is defined for the exchange of data on the take-out platform. This framework stores and It is abstracted separately and has specific implementations in different target engines, so there are some agreed specifications.

Figure 2 Open SQL fan-in fan-out flowchart

Primary key: The data must have a business primary key or a joint primary key. The purpose is to ensure that the data has a basis when it is aggregated or updated. The primary key is used as the RowKey in Nosql's engine, the primary key in the relational database, and the primary key in the ES. For Kudu, it is also the primary key, and the upsert for the data can be updated or inserted based on the basis.

Data column: The change of the data column will be a little more complicated. If it involves adding and changing columns in a relational database, it is basically unnecessary to actively expand the columns in Hbase and ES, only the data needs to be changed.

Partition fields: For fact table data, in the case of large amounts of data, in order to optimize retrieval efficiency and data storage, strategies for partitions and buckets are generally provided. For Hive, Impala, and GreenPlum engines, additional partition fields are added. It is one level to many levels. In the general business scenario, the first partition is the date, and you can change the finer granularity or other business fields according to the actual business needs. In the general Mysql, Postgresql, Hbase engine, there is no need to increase the partition field separately.

Data update range: Big data data exchange, in order to improve the efficiency of multiple batches of concurrent processing, which requires a batch of data segmentation, under normal circumstances will be intercepted according to a single field, the field type Most of the timestamps (create_time, update_time) can also be obtained in batches after sorting according to the key of the primary key. When the source data engine allows, according to multiple batches of concurrent queries, you can achieve good data acquisition and serialization. The operation is truncated into multiple stages of concurrency; this is also important in the case of multiple time batches of the same task, and each batch will define the scope of the design data update of this batch. The data update range generally obtains the data amount of this update before use, and the offset can be calculated according to the optimal performance of the original target engine single batch.

Multi-step process: multi-step as the name suggests is that the data preparation is not done overnight. For example, to obtain employee information in 3 Mysql libraries, Postgresql, Oracle, and the employee number is unified, the final data is gathered together in DB2, the most basic step Three copies of the data are imported into Oracle. This involves the Merge of the data previously made by the key. This will involve the insertion and update of the data. However, if there is a key and the target data column of different data sources is clear, three There is no big difference between the early arrival and late arrival scenarios. The second step is to analyze the aggregated information in a filtered scene based on the summarized data. The scene in this step is used as the source of calculation data, and the fan-out insertion result of the data is performed again. The third step can delete the temporary result of the first step. Therefore, in a multi-step scenario, data is aggregated, aggregated, and deleted in steps.

Update types: Open SQL scenarios for Baidu takeaway big data practice include Insert (large batch of detail scenarios), Update (subsequent update of data), Insert Once (aggregation result insertion), Insert Temp (temporary result cache), Delete (aftermath processing scenario) In these combined operation type scenarios, you need to add an execution priority message on the online. If the priority is differentiated, it will be executed according to the steps from front to back. If it is not set, it can be operated concurrently.

Black box exposure operation: Black box operation is a compromise method implemented under the condition of open SQL access principle, which can not be operated in accordance with the agreed specification. There are two purposes: on the one hand, the black box should be used for data The dependence process must be exposed to the outside world. This is to provide materials for later sorting out the blood relationship. On the other hand, the black box is used to meet the flexibility of data processing, such as the selection of json for xpath, and the centralized cache optimization scheme. It exposes the dependent source data, but it also causes a process that is difficult to interpret the data externally. At the same time, this black box is generally aimed at tables or multiple fields, and the degree of refinement is not enough.

Open SQL is a specification standard for big data doing data ETL. The purpose is that the exchange and flow of data is done through the configuration paradigm, not through hard coding or pure componentization. The more coding is to provide rich analytic functions, better cache and reuse of large intermediate result sets. Open SQL provides a philosophical question of where the data comes from and where it can go, and it can also explain what operations are done on the data. This is to provide the most basic guidance for the later blood relationship of the data. In the development process, Baidu The takeaway big data platform has also gone through the following different stages.

Figure 3 The evolution of distributed scheduling

3.2 Cooperating parameter consistency

In addition to the upstream and downstream relationships mentioned above, the scheduling strategy also needs to ensure the coordination of data processing in the big data scenario, and more importantly, the upload and delivery of fine parameters. Upstream and downstream use the system default parameter Key definition, you can also customize the Key parameters; system parameters such as start and end timestamp, machine IP, execution task instance, etc. For the default key of the global system, the dispatch system assigns the value.

The scope of the parameter has two ways: localization and global. Localization can set the key of the parameter: Value, the global of the same Key will not be overwritten, the local priority is higher than the global; and the global variables are generated by the upstream and Circulate; scheduling itself stipulates the addition, analysis, and encoding specifications of different operators in parameter reception. For example, there is a big difference between adding parameters in Shell commands and WebService.

In addition to the scope of the parameter, the parameter has the attribute of being passed. The upstream parameter can be targeted to the downstream output. Similarly, if the operator receives the upstream parameter, it can choose to modify the value, but this transmission is not modified.

3.3 Real-time check of data quality

Data production generally verifies the data before delivery. Due to the lengthy process of big data production, if the output data is later subjected to quality verification, it is often found that the problem is lagging behind. Therefore, the data can be verified during the staged delivery process of the data, and the data problems can be intervened earlier to ensure the reliability and timeliness of the data delivery.

Check operator: Aiming at the verification feature of data, a special operator is designed to provide quality assurance. There are generally two ways of data verification: comparing with your own history and comparing with other data sources. The former only needs to select the corresponding SQL or standard API for the target data source to obtain the data of the current production window, and then go to the year-on-year, chain ratio, sliding window average, left and right boundaries, etc., the time granularity can be flexible to days, hours, minutes . If you compare with other data sources, you need to describe the source and target separately. You can perform strict equality, interval, and float rate comparisons. The application scenarios are more data exchange. In addition to data comparison, it also provides comparisons of key field types, accuracy, and width, as well as statistical report output on vacancy rate, repetition rate, and differentiation, to visually view the sparseness and distribution of data.

Whole and sampling: For the comparison method of other data sources, the conventional method is to compare the number of items in the Count method by macro field sampling, and also to compare the Sum and Avg of data types. Here, you need to pay attention to the storage accuracy of different engines. There is a slight difference, try to choose an integer field; in addition, it will increase the field comparison of the entire column of the detailed data sampling, this comparison is easy to find the missing field value, type change and other problems.

It should be noted here that if the Check operator is not configured, it is considered that the data can be delivered after production; if there is a Check operator in the tree structure of the data, all data production nodes between the next Check operator are considered The default data can be delivered. This default operation is because the data verification does not necessarily have to be comprehensive, otherwise it will also cause time loss. In general, we think that it is only necessary to perform verification on key nodes. If the verification fails, the data ETL process is terminated by an alarm, which can be retried or manually intervened.

3.4 Data blood relationship

Philosophy of life interpretation: Blood relationship analysis is one of the characteristics that distinguishes big data scheduling from other scheduling. It mainly solves the "life philosophy problem" of big data: who am I, where do I come from, and where do I go. The basis of all this is the specification of open SQL for data access, and then relies on the analysis of open SQL to complete the blood relationship analysis, which mainly includes the upstream dependency relationship of the data and the downstream dependent relationship, these two are usually Involved, in addition to the third feature: calculation logic or caliber output, in view of the fact that the big data will be pushed to different business scenarios after calculation and mining, will cause the same caliber index to be different The calculation result, when it comes to the calculation logic, the research and development students are also at a loss, often need to trace back to the source code and process back to visit, which leads to an increase in unprofitable consumption.

Therefore, the calculation logic output is also an important feature of conventional and reduced manpower combing costs.

Open SQL can be explained to the outside world. The logic of where the data comes from and where it will go will also involve the calculation caliber at the specific SQL or API level, but here we need to mention the previous [black box exposure] and R & D focus on ETL development. Rich functions, the black box cannot explain the calculation logic, but the function can give the input and output parameters, so that the cost of providing the third feature is the lowest.

The method of blood relationship analysis relies on the syntax analysis of the SQL master engine. For example, Mysql can use Alibaba druid, JSqlparser, GreenPlum, Postgresql can use JSqlparser, Impala needs to perform syntax analysis through impala-frontend. The platform needs to be accurate to which upstream library tables and fields a single field depends on; the finer the finer, the more targeted it is when performing big data backtracking, and at the same time it is more conducive to the improvement of efficiency.

When backtracking on big data, the more targeted and conducive to efficiency improvement.

For non-SQL methods, such as the dependence of Hbase and ElasticSearch data sources, they will also be mapped to different documents / tables, columns in specific column clusters, and keys in source.

In short, the data can be interpreted as the value of blood relationship. Blood relationship and open SQL are also milestones in the evolution of ETL.

3.5 Table-based Transformer evolution

In big data scheduling, the most intuitive display to the user is whether a table can be delivered, or to see more precisely which fields in the table are available for delivery? This is done to make downstream data better selective and fine-grained dependent triggering actions. Therefore, three types of roles will be distinguished in big data scheduling, from coarse-grained to fine-grained: Job, Transformer, and operator.

Figure 4 Three examples of collaboration

The following explains the division of labor and collaboration between the three:

Job: Job's main role is to coordinate data relevance. To put it simply, it is a coordinated plan for collaboration between tables and multiple data sources. It is a process of maximum granularity and the instantiation of specific scheduling. The process uses Job as the entrance, and the other two roles do not have the ability to be instantiated. Here we will distinguish between tasks that also have data dependencies, but not necessarily tasks on one execution frequency. You can adopt different job dependency configurations.

Transformer: A transform represents a table, and the table is taken out separately because the table is a complete symbol in the delivery process of big data, which is not as granular as the library, nor is the field too fine to be fully expressed externally .

Operator: The operator is the finest granularity of scheduling and is indivisible. The classification of operators will expand a lot according to the application, there are control type operators, such as start and stop operators, distribution operators, Check operators, etc. There will also be functional operators that are encapsulated for data operations, such as getting hdfs data to push to mysql, Ftp to object storage, etc .; functional operators for big data scheduling are for the generation of a single field or several fields, this is completely Depending on the difficulty of data generation and the relevance of combined backtracking, it is finally configured by open SQL. For example, one line is considered to describe the function of an operator. The data acquisition in the select field can be multiple , The corresponding corresponding insert can also be multiple; after the completion of development of big data scheduling, more operation and maintenance energy in the later period is rich in operators. The implementation of the operator will take into account the aforementioned flexible and universal options.

3.6 Refinement based on fields

Field-level backtracking mainly relies on 21 to complete. The preceding 2 refers to the blood relationship to update the target engine; through open SQL, the blood relationship of the data can be sorted out, which is convenient to analyze the points and dependencies that can be dependent on the upstream and downstream of the entire chain The point of concurrency. The other 1 means that in the graphical interface of scheduling, you can select a transformer or some operators that need to be traced back for a specific instantiated job.

Similarly, according to the process in Figure 4 above, we take a specific example. The black 0/6 marked in the figure represents the part of the black box in the open SQL, which is an unexplainable production process for the data; the three marked graphics 2 represent the Check operator, and the other rounded squares have the same color It means that the relationship between upstream and downstream is dependent, for example, 7 will depend on the upstream 1. Below we understand the traceback of the following scenarios:

Backtracking 1: In this case, operators 1/2/3/4/6 will be backtracked, and operators 0 and 5 will not be executed, also because there is a check operator 2 immediately after 1, then After 1 is executed, operator 7 will not be executed concurrently because there is a black operator 6. However, after operator 2 is successfully executed, if operator 6's dependence and output relationship can be exposed, operator 7 can be executed without waiting for the completion of operator 3/4/6. So it saves a certain amount of time. Other scenes are similar

Back to Transformer2, this scenario operator 7 and operator 9 will trigger the execution at the same time. Similarly, if operator 9 is completed, the 11 in downstream transformer3 will not be executed because it is a non-first node, but in operator 7 After the execution is completed, both operator 13 and operator 10 will be called up at the same time.

Updatable target engine refers to non-SQL On Hadoop file solutions, similar to GreenPlum, Hbase, ES can be updated in real time. I won't go into details here.

3.7 Signal light

The semaphore is used as a message middleware in the distributed scheduling of big data. The main role of the producer is to release the signal to the producer at the end of data production and through the data quality verification process. This contains specific library tables, fields and this batch. The data range and other information, consumers (Consumer) can listen to different table topics as needed to complete subsequent operations. Through the way of semaphores, it can well decouple the downstream dependence of data. At the same time, semaphores can also be applied to the data completion status of library tables and fields in the data mart, allowing users to view and eliminate the availability of data. Whether to deliver the interaction.

to sum up

The application scenario of big data distributed scheduling is closely related to the definition process of ETL, the data engine and the needs of business scenarios. The process of distributed scheduling is a process of gradual improvement through scene-driven, Baidu takeaway big data scheduling V2 .0 is after satisfying the general scheduling, after discovering the problems of data interpretation and fine-grained update delay, etc., started a step-by-step iterative improvement process, and we are also looking forward to the day when our system is open source.


Thanks for watching

Join Us

Company/Organization Name:

Company/Organization Site:

Candidate Name:

Candidate Job:

Tel:

Email:

Admission Remarks: (cause and appeal of admission)

Submit application