2022-10-27 628
前面聊了Airflow基础架构??,以及又讲了如何在容器化内部署Airflow??,今天我们就再来看看如何通过Airflow和celery构建一个健壮的分布式调度集群。
同样是在Ubuntu 20.04.3 LTS机器上安装Airflow集群,这次我们准备三台同等配置服务器,进行测试,前篇文章??[1]中,我们已经在Bigdata1服务器上安装了airflow的所有组件,没看过的可以点击链接先看下之前的文章,现在只需要在其他两个节点安装worker组件即可。
Bigdata1(A) | Bigdata2(B) | Bigdata3(C) | |
---|---|---|---|
Webserver | √ | ||
Scheduler | √ | ||
Worker | √ | √ | √ |
在上篇文章中的docker-compose.yml中没有对部署文件以及数据目录进行的分离,这样在后期管理的时候不太方便,因此我们可以把服务停止后,将数据库以及数据目录与部署文件分开
这样拆分开就方便后期的统一管理了。
前期准备
mkdir/data/airflow/{dags,plugins}-pv mkdir-pv/apps/airflow mkdir-pv/logs/airflow
worker的部署文件:
--- version:'3' x-airflow-common: &airflow-common #Inordertoaddcustomdependenciesorupgradeproviderpackagesyoucanuseyourextendedimage. #Commenttheimageline,placeyourDockerfileinthedirectorywhereyouplacedthedocker-compose.yaml #anduncommentthe"build"linebelow,Thenrun`docker-composebuild`tobuildtheimages. image:${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3} #build:. environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR:CeleryExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN:mysql+mysqldb://airflow:aaaa@$${MYSQL_HOST}:3306/airflow#修改MySQL对应的账号和密码 AIRFLOW__CELERY__RESULT_BACKEND:db+mysql://airflow:aaaa@$${MYSQL_HOST}:3306/airflow#修改MySQL对应的账号和密码 AIRFLOW__CELERY__BROKER_URL:redis://:xxxx@$${REDIS_HOST}:7480/0#修改Redis的密码 AIRFLOW__CORE__FERNET_KEY:'' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION:'true' AIRFLOW__CORE__LOAD_EXAMPLES:'true' AIRFLOW__API__AUTH_BACKEND:'airflow.api.auth.backend.basic_auth' _PIP_ADDITIONAL_REQUIREMENTS:${_PIP_ADDITIONAL_REQUIREMENTS:-} volumes: -/data/airflow/dags:/opt/airflow/dags -/logs/airflow:/opt/airflow/logs -/data/airflow/plugins:/opt/airflow/plugins -/data/airflow/airflow.cfg:/opt/airflow/airflow.cfg user:"${AIRFLOW_UID:-50000}:0" services: airflow-worker: <<:*airflow-common command:celeryworker healthcheck: test: -"CMD-SHELL" -'celery--appairflow.executors.celery_executor.appinspectping-d"celery@$${HOSTNAME}"' interval:10s timeout:10s retries:5 environment: <<:*airflow-common-env #Requiredtohandlewarmshutdownoftheceleryworkersproperly #Seehttps://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation DUMB_INIT_SETSID:"0" restart:always hostname:bigdata-20-194#此处设置容器的主机名,便于在flower中查看是哪个worker depends_on: airflow-init: condition:service_completed_successfully airflow-init: <<:*airflow-common entrypoint:/bin/bash #yamllintdisablerule:line-length command: --c -| functionver(){ printf"%04d%04d%04d%04d"$${1//./} } airflow_version=$$(gosuairflowairflowversion) airflow_version_comparable=$$(ver$${airflow_version}) min_airflow_version=2.2.0 min_airflow_version_comparable=$$(ver$${min_airflow_version}) if((airflow_version_comparable<min_airflow_version_comparable));then echo echo-e"\033[1;31mERROR!!!:ToooldAirflowversion$${airflow_version}!\e[0m" echo"TheminimumAirflowversionsupported:$${min_airflow_version}.Onlyusethisorhigher!" echo exit1 fi if[[-z"${AIRFLOW_UID}"]];then echo echo-e"\033[1;33mWARNING!!!:AIRFLOW_UIDnotset!\e[0m" echo"IfyouareonLinux,youSHOULDfollowtheinstructionsbelowtoset" echo"AIRFLOW_UIDenvironmentvariable,otherwisefileswillbeownedbyroot." echo"Forotheroperatingsystemsyoucangetridofthewarningwithmanuallycreated.envfile:" echo"See:https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user" echo fi one_meg=1048576 mem_available=$$(($$(getconf_PHYS_PAGES)*$$(getconfPAGE_SIZE)/one_meg)) cpus_available=$$(grep-cE'cpu[0-9]+'/proc/stat) disk_available=$$(df/|tail-1|awk'{print$$4}') warning_resources="false" if((mem_available<4000));then echo echo-e"\033[1;33mWARNING!!!:NotenoughmemoryavailableforDocker.\e[0m" echo"Atleast4GBofmemoryrequired.Youhave$$(numfmt--toiec$$((mem_available*one_meg)))" echo warning_resources="true" fi if((cpus_available<2));then echo echo-e"\033[1;33mWARNING!!!:NotenoughCPUSavailableforDocker.\e[0m" echo"Atleast2CPUsrecommended.Youhave$${cpus_available}" echo warning_resources="true" fi if((disk_available<one_meg*10));then echo echo-e"\033[1;33mWARNING!!!:NotenoughDiskspaceavailableforDocker.\e[0m" echo"Atleast10GBsrecommended.Youhave$$(numfmt--toiec$$((disk_available*1024)))" echo warning_resources="true" fi if[[$${warning_resources}=="true"]];then echo echo-e"\033[1;33mWARNING!!!:YouhavenotenoughresourcestorunAirflow(seeabove)!\e[0m" echo"Pleasefollowtheinstructionstoincreaseamountofresourcesavailable:" echo"https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin" echo fi mkdir-p/sources/logs/sources/dags/sources/plugins chown-R"${AIRFLOW_UID}:0"/sources/{logs,dags,plugins} exec/entrypointairflowversion #yamllintenablerule:line-length environment: <<:*airflow-common-env _AIRFLOW_DB_UPGRADE:'true' _AIRFLOW_WWW_USER_CREATE:'true' _AIRFLOW_WWW_USER_USERNAME:${_AIRFLOW_WWW_USER_USERNAME:-airflow} _AIRFLOW_WWW_USER_PASSWORD:${_AIRFLOW_WWW_USER_PASSWORD:-airflow} user:"0:0" volumes: -.:/sources airflow-cli: <<:*airflow-common profiles: -debug environment: <<:*airflow-common-env CONNECTION_CHECK_MAX_COUNT:"0" #Workaroundforentrypointissue.See:https://github.com/apache/airflow/issues/16252 command: -bash --c -airflow
初始化检测,检查环境是否满足:
cd/apps/ariflow/ echo-e"AIRFLOW_UID=$(id-u)">.env#注意,此处一定要保证AIRFLOW_UID是普通用户的UID,且保证此用户有创建这些持久化目录的权限 docker-composeupairflow-init
如果数据库已经存在,初始化检测不影响已有的数据库,接下来就运行airflow-worker服务
docker-composeup-d
接下来,按照同样的方式在bigdata3节点上安装airflow-worker服务就可以了。部署完成之后,就可以通过flower查看broker的状态:
大多情况下,使用airflow多worker节点的集群,我们就需要持久化airflow的配置文件,并且将airflow同步到所有的节点上,因此这里需要修改一下docker-compose.yaml中x-airflow-common的volumes,将airflow.cfg通过挂载卷的形式挂载到容器中,配置文件可以在容器中拷贝一份出来,然后在修改;
前期使用的时候,我们需要将docker-compose文件中的一些环境变量的值写入到airflow.cfg文件中,例如以下信息:
[core] dags_folder=/opt/airflow/dags hostname_callable=socket.getfqdn default_timezone=Asia/Shanghai#修改时区 executor=CeleryExecutor sql_alchemy_conn=mysql+mysqldb://airflow:aaaa@$${MYSQL_HOST}:3306/airflow sql_engine_encoding=utf-8 sql_alchemy_pool_enabled=True sql_alchemy_pool_size=5 sql_alchemy_max_overflow=10 sql_alchemy_pool_recycle=1800 sql_alchemy_pool_pre_ping=True sql_alchemy_schema= parallelism=32 max_active_tasks_per_dag=16 dags_are_paused_at_creation=True max_active_runs_per_dag=16 load_examples=True load_default_connections=True plugins_folder=/opt/airflow/plugins execute_tasks_new_python_interpreter=False fernet_key= donot_pickle=True dagbag_import_timeout=30.0 dagbag_import_error_tracebacks=True dagbag_import_error_traceback_depth=2 dag_file_processor_timeout=50 task_runner=StandardTaskRunner default_impersonation= security= unit_test_mode=False enable_xcom_pickling=False killed_task_cleanup_time=60 dag_run_conf_overrides_params=True dag_discovery_safe_mode=True default_task_retries=0 default_task_weight_rule=downstream min_serialized_dag_update_interval=30 min_serialized_dag_fetch_interval=10 max_num_rendered_ti_fields_per_task=30 check_slas=True xcom_backend=airflow.models.xcom.BaseXCom lazy_load_plugins=True lazy_discover_providers=True max_db_retries=3 hide_sensitive_var_conn_fields=True sensitive_var_conn_names= default_pool_task_slot_count=128 [logging] base_log_folder=/opt/airflow/logs remote_logging=False remote_log_conn_id= google_key_path= remote_base_log_folder= encrypt_s3_logs=False logging_level=INFO fab_logging_level=WARNING logging_config_class= colored_console_log=True colored_log_format=[%%(blue)s%%(asctime)s%%(reset)s]{%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d}%%(log_color)s%%(levelname)s%%(reset)s-%%(log_color)s%%(message)s%%(reset)s colored_formatter_class=airflow.utils.log.colored_log.CustomTTYColoredFormatter log_format=[%%(asctime)s]{%%(filename)s:%%(lineno)d}%%(levelname)s-%%(message)s simple_log_format=%%(asctime)s%%(levelname)s-%%(message)s task_log_prefix_template= log_filename_template={{ti.dag_id}}/{{ti.task_id}}/{{ts}}/{{try_number}}.log log_processor_filename_template={{filename}}.log dag_processor_manager_log_location=/opt/airflow/logs/dag_processor_manager/dag_processor_manager.log task_log_reader=task extra_logger_names= worker_log_server_port=8793 [metrics] statsd_on=False statsd_host=localhost statsd_port=8125 statsd_prefix=airflow statsd_allow_list= stat_name_handler= statsd_datadog_enabled=False statsd_datadog_tags= [secrets] backend= backend_kwargs= [cli] api_client=airflow.api.client.local_client endpoint_url=http://localhost:8080 [debug] fail_fast=False [api] enable_experimental_api=False auth_backend=airflow.api.auth.backend.deny_all maximum_page_limit=100 fallback_page_limit=100 google_oauth2_audience= google_key_path= access_control_allow_headers= access_control_allow_methods= access_control_allow_origins= [lineage] backend= [atlas] sasl_enabled=False host= port=21000 username= password= [operators] default_owner=airflow default_cpus=1 default_ram=512 default_disk=512 default_gpus=0 default_queue=default allow_illegal_arguments=False [hive] default_hive_mapred_queue= [webserver] base_url=https://devopsman.cn/airflow#自定义airflow域名 default_ui_timezone=Asia/Shanghai#设置默认的时区 web_server_host=0.0.0.0 web_server_port=8080 web_server_ssl_cert= web_server_ssl_key= web_server_master_timeout=120 web_server_worker_timeout=120 worker_refresh_batch_size=1 worker_refresh_interval=6000 reload_on_plugin_change=False secret_key=emEfndkf3QWZ5zVLE1kVMg== workers=4 worker_class=sync access_logfile=- error_logfile=- access_logformat= expose_config=False expose_hostname=True expose_stacktrace=True dag_default_view=tree dag_orientation=LR log_fetch_timeout_sec=5 log_fetch_delay_sec=2 log_auto_tailing_offset=30 log_animation_speed=1000 hide_paused_dags_by_default=False page_size=100 navbar_color=#fff default_dag_run_display_number=25 enable_proxy_fix=False proxy_fix_x_for=1 proxy_fix_x_proto=1 proxy_fix_x_host=1 proxy_fix_x_port=1 proxy_fix_x_prefix=1 cookie_secure=False cookie_samesite=Lax default_wrap=False x_frame_enabled=True show_recent_stats_for_completed_runs=True update_fab_perms=True session_lifetime_minutes=43200 auto_refresh_interval=3 [email] email_backend=airflow.utils.email.send_email_smtp email_conn_id=smtp_default default_email_on_retry=True default_email_on_failure=True [smtp]#邮箱配置 smtp_host=localhost smtp_starttls=True smtp_ssl=False smtp_port=25 smtp_mail_from=airflow@example.com smtp_timeout=30 smtp_retry_limit=5 [sentry] sentry_on=false sentry_dsn= [celery_kubernetes_executor] kubernetes_queue=kubernetes [celery] celery_app_name=airflow.executors.celery_executor worker_concurrency=16 worker_umask=0o077 broker_url=redis://:xxxx@$${REDIS_HOST}:7480/0 result_backend=db+mysql://airflow:aaaa@$${MYSQL_HOST}:3306/airflow flower_host=0.0.0.0 flower_url_prefix= flower_port=5555 flower_basic_auth= sync_parallelism=0 celery_config_options=airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG ssl_active=False ssl_key= ssl_cert= ssl_cacert= pool=prefork operation_timeout=1.0 task_track_started=True task_adoption_timeout=600 task_publish_max_retries=3 worker_precheck=False [celery_broker_transport_options] [dask] cluster_address=127.0.0.1:8786 tls_ca= tls_cert= tls_key= [scheduler] job_heartbeat_sec=5 scheduler_heartbeat_sec=5 num_runs=-1 scheduler_idle_sleep_time=1 min_file_process_interval=30 dag_dir_list_interval=300 print_stats_interval=30 pool_metrics_interval=5.0 scheduler_health_check_threshold=30 orphaned_tasks_check_interval=300.0 child_process_log_directory=/opt/airflow/logs/scheduler scheduler_zombie_task_threshold=300 catchup_by_default=True max_tis_per_query=512 use_row_level_locking=True max_dagruns_to_create_per_loop=10 max_dagruns_per_loop_to_schedule=20 schedule_after_task_execution=True parsing_processes=2 file_parsing_sort_mode=modified_time use_job_schedule=True allow_trigger_in_future=False dependency_detector=airflow.serialization.serialized_objects.DependencyDetector trigger_timeout_check_interval=15 [triggerer] default_capacity=1000 [kerberos] ccache=/tmp/airflow_krb5_ccache principal=airflow reinit_frequency=3600 kinit_path=kinit keytab=airflow.keytab forwardable=True include_ip=True [github_enterprise] api_rev=v3 [elasticsearch] host= log_id_template={dag_id}-{task_id}-{execution_date}-{try_number} end_of_log_mark=end_of_log frontend= write_stdout=False json_format=False json_fields=asctime,filename,lineno,levelname,message host_field=host offset_field=offset [elasticsearch_configs] use_ssl=False verify_certs=True [kubernetes] pod_template_file= worker_container_repository= worker_container_tag= namespace=default delete_worker_pods=True delete_worker_pods_on_failure=False worker_pods_creation_batch_size=1 multi_namespace_mode=False in_cluster=True kube_client_request_args= delete_option_kwargs= enable_tcp_keepalive=True tcp_keep_idle=120 tcp_keep_intvl=30 tcp_keep_cnt=6 verify_ssl=True worker_pods_pending_timeout=300 worker_pods_pending_timeout_check_interval=120 worker_pods_queued_check_interval=60 worker_pods_pending_timeout_batch_size=100 [smart_sensor] use_smart_sensor=False shard_code_upper_limit=10000 shards=5 sensors_enabled=NamedHivePartitionSensor
修改完成之后,重启一下服务。
docker-composerestart
因为airflow使用了三个worker节点,每个节点修改配置,其他节点都要同步,同时DAGS目录以及plugins目录也需要实时进行同步,在scheduler将信息调度到某个节点后,如果找不到对应的DAGS文件,就会报错,因此我们使用lsyncd进行数据实时同步:
apt-getinstalllsyncd-y
配置节点之间通过公钥连接
ssh-keygen-trsa-C"airflow-sync"-b4096#生成一对名为airflow-sync的密钥 foripin100200;dossh-copy-id-i~/.ssh/airflow-sync.pub${USERNAME}@192.168.0.$ip-P12022;done
然后我们就可以通过私钥访问了其它节点了。
编辑同步的配置文件,lsyncd配置的更多参数学习,可以直达官方文档[2]
settings{ logfile="/var/log/lsyncd.log",#日志文件 statusFile="/var/log/lsyncd.status",#同步状态信息 pidfile="/var/run/lsyncd.pid", statusInterval=1, nodaemon=false,#守护进程 inotifyMode="CloseWrite", maxProcesses=1, maxDelays=1, } sync{ default.rsync, source="/data/airflow", target="192.168.0.100:/data/airflow", rsync={ binary="/usr/bin/rsync", compress=false, archive=true, owner=true, perms=true, --delete=true, whole_file=false, rsh="/usr/bin/ssh-p12022-lsuoper-oStrictHostKeyChecking=no-i/home/username/.ssh/airflow-rsync" }, } sync{ default.rsync, source="/data/airflow", target="192.168.0.200:/data/airflow", rsync={ binary="/usr/bin/rsync", compress=false, archive=true, owner=true, perms=true, --delete=true, whole_file=false, rsh="/usr/bin/ssh-p12022-lsuoper-oStrictHostKeyChecking=no-i/home/username/.ssh/airflow-rsync" }, }
以上的参数是什么意思,可以访问官网查看,此处是通过rsync的rsh定义ssh命令,能够解决使用了私钥,自定义端口等安全措施的场景,当然你也可以使用配置无密访问,然后使用default.rsync或者default.rsyncssh等进行配置。
配置lsyncd的服务托管
cat<<EOF>/etc/systemd/system/lsyncd.service [Unit] Description=lsyncd ConditionFileIsExecutable=/usr/bin/lsyncd After=network-online.target Wants=network-online.target [Service] StartLimitBurst=10 ExecStart=/usr/bin/lsyncd/etc/lsyncd.conf Restart=on-failure RestartSec=120 EnvironmentFile=-/etc/sysconfig/aliyun KillMode=process [Install] WantedBy=multi-user.target EOF systemctldaemon-reload systemctlenable--nowlsyncd.service#启动服务并配置开启自启
这样就完成了数据(dags,plugins,airflow.cfg)的同步问题,后期使用CICD场景的时候,便可以直接将dag文件上传到Bigdata1节点上即可,其他两个节点就会自动同步了。如果出现问题,可以通过查看日志进行debug
lsyncd-logall/etc/lsyncd.conf tail-f/var/log/lsyncd.log
如果你需要将airflow放在反向代理之后,如https://lab.mycompany.com/myorg/airflow/你可以通过一下配置完成:
在airflow.cfg中配置base_url
base_url=http://my_host/myorg/airflow enable_proxy_fix=True
nginx的配置
server{ listen80; server_namelab.mycompany.com; location/myorg/airflow/{ proxy_passhttp://localhost:8080; proxy_set_headerHost$http_host; proxy_redirectoff; proxy_http_version1.1; proxy_set_headerUpgrade$http_upgrade; proxy_set_headerConnection"upgrade"; } }
到这里就基本上完成的airflow分布式调度集群的安装了.看下具体效果如下。
看到这里说明你也正在使用或对Airflow感兴趣,顺便送你一个学习Airflow资料;
https://livebook.manning.com/book/data-pipelines-with-apache-airflow/chapter-12/1
参考资料
[1]Airflow 2.2.3 + MySQL8.0.27: https://mp.weixin.qq.com/s/VncpyXcTtlvnDkFrsAZ5lQ
[2]lsyncd config file: https://lsyncd.github.io/lsyncd/manual/config/file/
[3]airflow-behind-proxy: https://airflow.apache.org/docs/apache-airflow/stable/howto/run-behind-proxy.html
原文链接:https://77isp.com/post/10602.html
=========================================
https://77isp.com/ 为 “云服务器技术网” 唯一官方服务平台,请勿相信其他任何渠道。
数据库技术 2022-03-28
网站技术 2022-11-26
网站技术 2023-01-07
网站技术 2022-11-17
Windows相关 2022-02-23
网站技术 2023-01-14
Windows相关 2022-02-16
Windows相关 2022-02-16
Linux相关 2022-02-27
数据库技术 2022-02-20
抠敌 2023年10月23日
嚼餐 2023年10月23日
男忌 2023年10月22日
瓮仆 2023年10月22日
簿偌 2023年10月22日
扫码二维码
获取最新动态