์ž์Šต์„œ 2: ์ž‘์—… ๋ฐ ์ž‘์—… ๊ทธ๋ž˜ํ”„(DAG) ๋งŒ๋“ค๊ธฐ ๋ฐ ๊ด€๋ฆฌยถ

์†Œ๊ฐœยถ

์ด ์ž์Šต์„œ์—์„œ๋Š” ๋ช‡ ๊ฐ€์ง€ ๊ธฐ๋ณธ ์ €์žฅ ํ”„๋กœ์‹œ์ €๋ฅผ ๊ด€๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด Snowflake ์ž‘์—…์„ ์ƒ์„ฑํ•˜๊ณ  ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ๋˜ํ•œ ์ž‘์—… ๊ทธ๋ž˜ํ”„(๋ฐฉํ–ฅ์„ฑ ๋น„์ˆœํ™˜ ๊ทธ๋ž˜ํ”„(DAG)๋ผ๊ณ ๋„ ํ•จ)๋ฅผ ์ƒ์„ฑํ•˜์—ฌ ์ƒ์œ„ ์ˆ˜์ค€์˜ ์ž‘์—… ๊ทธ๋ž˜ํ”„ API๋กœ ์ž‘์—…์„ ์˜ค์ผ€์ŠคํŠธ๋ ˆ์ด์…˜ํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

์ „์ œ ์กฐ๊ฑดยถ

์ฐธ๊ณ 

์ด๋ฏธ Snowflake Python APIs ์ž์Šต์„œ์˜ ์ผ๋ฐ˜ ์„ค์ • ๋ฐ ์ž์Šต์„œ 1: ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค, ์Šคํ‚ค๋งˆ, ํ…Œ์ด๋ธ”, ์›จ์–ดํ•˜์šฐ์Šค ๋งŒ๋“ค๊ธฐ ์„ ์™„๋ฃŒํ•œ ๊ฒฝ์šฐ, ์ด๋Ÿฌํ•œ ์ „์ œ ์กฐ๊ฑด์„ ๊ฑด๋„ˆ๋›ฐ๊ณ  ์ด ์ž์Šต์„œ์˜ ์ฒซ ๋ฒˆ์งธ ๋‹จ๊ณ„๋กœ ๋„˜์–ด๊ฐˆ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ด ์ž์Šต์„œ๋ฅผ ์‹œ์ž‘ํ•˜๊ธฐ ์ „์— ๋‹ค์Œ ๋‹จ๊ณ„๋ฅผ ์™„๋ฃŒํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

  1. ๋‹ค์Œ ๋‹จ๊ณ„๊ฐ€ ํฌํ•จ๋œ ์ผ๋ฐ˜ ์„ค์ • ์ง€์นจ์„ ๋”ฐ๋ฅด์‹ญ์‹œ์˜ค.

    • ๊ฐœ๋ฐœ ํ™˜๊ฒฝ์„ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค.

    • Snowflake Python APIs ํŒจํ‚ค์ง€๋ฅผ ์„ค์น˜ํ•ฉ๋‹ˆ๋‹ค.

    • Snowflake ์—ฐ๊ฒฐ์„ ๊ตฌ์„ฑํ•ฉ๋‹ˆ๋‹ค.

    • Python API ์ž์Šต์„œ์— ํ•„์š”ํ•œ ๋ชจ๋“  ๋ชจ๋“ˆ์„ ๊ฐ€์ ธ์˜ต๋‹ˆ๋‹ค.

    • API Root ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

  2. ๋‹ค์Œ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•˜์—ฌ ์ด๋ฆ„์ด PYTHON_API_DB ์ธ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์™€ ํ•ด๋‹น ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์ด๋ฆ„์ด PYTHON_API_SCHEMA ์ธ ์Šคํ‚ค๋งˆ๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

    database = root.databases.create(
      Database(
        name="PYTHON_API_DB"),
        mode=CreateMode.or_replace
      )
    
    schema = database.schemas.create(
      Schema(
        name="PYTHON_API_SCHEMA"),
        mode=CreateMode.or_replace,
      )
    
    Copy

    ์ด๋Š” ์ž์Šต์„œ 1 ์—์„œ ์ƒ์„ฑํ•œ ๊ฒƒ๊ณผ ๋™์ผํ•œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋ฐ ์Šคํ‚ค๋งˆ ์˜ค๋ธŒ์ ํŠธ์ž…๋‹ˆ๋‹ค.

์ด๋Ÿฌํ•œ ์‚ฌ์ „ ์š”๊ตฌ ์‚ฌํ•ญ์„ ์™„๋ฃŒํ•˜๋ฉด ์ž‘์—… ๊ด€๋ฆฌ๋ฅผ ์œ„ํ•ด API๋ฅผ ์‚ฌ์šฉํ•  ์ค€๋น„๊ฐ€ ๋œ ๊ฒƒ์ž…๋‹ˆ๋‹ค.

Snowflake ์˜ค๋ธŒ์ ํŠธ ์„ค์ •ยถ

์ž‘์—…์—์„œ ํ˜ธ์ถœํ•  ์ €์žฅ ํ”„๋กœ์‹œ์ €์™€ ์ €์žฅ ํ”„๋กœ์‹œ์ €๋ฅผ ๋ณด๊ด€ํ•  ์Šคํ…Œ์ด์ง€๋ฅผ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค. Snowflake Python APIs root ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ด์ „์— ์ƒ์„ฑํ•œ PYTHON_API_DB ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋ฐ PYTHON_API_SCHEMA ์Šคํ‚ค๋งˆ์— ์Šคํ…Œ์ด์ง€๋ฅผ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  1. ์ด๋ฆ„์ด TASKS_STAGE ์ธ ์Šคํ…Œ์ด์ง€๋ฅผ ์ƒ์„ฑํ•˜๋ ค๋ฉด ๋…ธํŠธ๋ถ์˜ ๋‹ค์Œ ์…€์—์„œ ๋‹ค์Œ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

    stages = root.databases[database.name].schemas[schema.name].stages
    stages.create(Stage(name="TASKS_STAGE"))
    
    Copy

    ์ด ์Šคํ…Œ์ด์ง€์—์„œ๋Š” ์ €์žฅ๋œ ํ”„๋กœ์‹œ์ €์™€ ํ•ด๋‹น ํ”„๋กœ์‹œ์ €์— ํ•„์š”ํ•œ ๋ชจ๋“  ์ข…์†์„ฑ์ด ๋ณด๊ด€๋ฉ๋‹ˆ๋‹ค.

  2. ์ž‘์—…์ด ์ €์žฅ ํ”„๋กœ์‹œ์ €๋กœ ์‹คํ–‰๋˜๋Š” ๋‘ ๊ฐ€์ง€ ๊ธฐ๋ณธ Python ํ•จ์ˆ˜๋ฅผ ์ƒ์„ฑํ•˜๋ ค๋ฉด ๋‹ค์Œ ์…€์—์„œ ๋‹ค์Œ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

    def trunc(session: Session, from_table: str, to_table: str, count: int) -> str:
      (
        session
        .table(from_table)
        .limit(count)
        .write.save_as_table(to_table)
      )
      return "Truncated table successfully created!"
    
    def filter_by_shipmode(session: Session, mode: str) -> str:
      (
        session
        .table("snowflake_sample_data.tpch_sf100.lineitem")
        .filter(col("L_SHIPMODE") == mode)
        .limit(10)
        .write.save_as_table("filter_table")
      )
      return "Filter table successfully created!"
    
    Copy

    ์ด๋Ÿฌํ•œ ํ•จ์ˆ˜๋Š” ๋‹ค์Œ์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

    • trunc(): ์ž…๋ ฅ ํ…Œ์ด๋ธ”์˜ ์ž˜๋ฆฐ ๋ฒ„์ „์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

    • filter_by_shipmode(): ๋ฐฐ์†ก ๋ชจ๋“œ๋ณ„๋กœ SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM ํ…Œ์ด๋ธ”์„ ํ•„ํ„ฐ๋งํ•˜๊ณ  ๊ฒฐ๊ณผ๋ฅผ 10๊ฐœ ํ–‰์œผ๋กœ ์ œํ•œํ•œ ๋‹ค์Œ ๊ฒฐ๊ณผ๋ฅผ ์ƒˆ ํ…Œ์ด๋ธ”์— ์”๋‹ˆ๋‹ค.

      ์ฐธ๊ณ 

      ์ด ํ•จ์ˆ˜๋Š” SNOWFLAKE_SAMPLE_DATA ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ TPC-H ์ƒ˜ํ”Œ ๋ฐ์ดํ„ฐ ๋ฅผ ์ฟผ๋ฆฌํ•ฉ๋‹ˆ๋‹ค. Snowflake๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ ์ƒˆ ๊ณ„์ •์— ์ƒ˜ํ”Œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋ฅผ ๋งŒ๋“ญ๋‹ˆ๋‹ค. ๊ท€ํ•˜์˜ ๊ณ„์ •์— ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๊ฐ€ ์ƒ์„ฑ๋˜์ง€ ์•Š์€ ๊ฒฝ์šฐ ์ƒ˜ํ”Œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์‚ฌ์šฉํ•˜๊ธฐ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

    ์ด๋Ÿฌํ•œ ํ•จ์ˆ˜๋Š” ์˜๋„์ ์œผ๋กœ ๊ธฐ๋ณธ์ ์ธ ๊ฒƒ์ด๋ฉฐ ๋ฐ๋ชจ ๋ชฉ์ ์œผ๋กœ ๊ฐœ๋ฐœ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.

์ž‘์—… ์ƒ์„ฑ ๋ฐ ๊ด€๋ฆฌยถ

์ด์ „์— ์ƒ์„ฑํ•œ Python ํ•จ์ˆ˜๋ฅผ ์ €์žฅ ํ”„๋กœ์‹œ์ €๋กœ ์‹คํ–‰ํ•˜๋Š” ๋‘ ๊ฐ€์ง€ ์ž‘์—…์„ ์ •์˜, ์ƒ์„ฑ ๋ฐ ๊ด€๋ฆฌํ•ฉ๋‹ˆ๋‹ค.

  1. ๋…ธํŠธ๋ถ์˜ ๋‹ค์Œ ์…€์— task1 ๋ฐ task2 ๋ผ๋Š” ๋‘ ์ž‘์—…์„ ์ •์˜ํ•˜๋ ค๋ฉด ๋‹ค์Œ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

    tasks_stage = f"{database.name}.{schema.name}.TASKS_STAGE"
    
    task1 = Task(
        name="task_python_api_trunc",
        definition=StoredProcedureCall(
          func=trunc,
          stage_location=f"@{tasks_stage}",
          packages=["snowflake-snowpark-python"],
        ),
        warehouse="COMPUTE_WH",
        schedule=timedelta(minutes=1)
    )
    
    task2 = Task(
        name="task_python_api_filter",
        definition=StoredProcedureCall(
          func=filter_by_shipmode,
          stage_location=f"@{tasks_stage}",
          packages=["snowflake-snowpark-python"],
        ),
        warehouse="COMPUTE_WH"
    )
    
    Copy

    ์ด ์ฝ”๋“œ์—์„œ๋Š” ๋‹ค์Œ ์ž‘์—… ๋งค๊ฐœ ๋ณ€์ˆ˜๋ฅผ ์ง€์ •ํ•ฉ๋‹ˆ๋‹ค.

    • ๊ฐ ์ž‘์—…์— ๋Œ€ํ•ด, ๋‹ค์Œ ํŠน์„ฑ์„ ํฌํ•จํ•˜๋Š” StoredProcedureCall ์˜ค๋ธŒ์ ํŠธ์— ์˜ํ•ด ํ‘œ์‹œ๋˜๋Š” ์ •์˜์ž…๋‹ˆ๋‹ค.

      • ์‹คํ–‰ํ•  ํ˜ธ์ถœ ๊ฐ€๋Šฅํ•œ ํ•จ์ˆ˜

      • Python ํ•จ์ˆ˜์˜ ๋‚ด์šฉ๊ณผ ์ข…์†์„ฑ์ด ์—…๋กœ๋“œ๋˜๋Š” ์Šคํ…Œ์ด์ง€ ์œ„์น˜

      • ์ €์žฅ ํ”„๋กœ์‹œ์ €์˜ ํŒจํ‚ค์ง€ ์ข…์†์„ฑ

    • ์ €์žฅ ํ”„๋กœ์‹œ์ €๋ฅผ ์‹คํ–‰ํ•  ์›จ์–ดํ•˜์šฐ์Šค(StoredProcedureCall ์˜ค๋ธŒ์ ํŠธ๊ฐ€ ์žˆ๋Š” ์ž‘์—…์„ ์ƒ์„ฑํ•  ๋•Œ ํ•„์š”). ์ด ์ž์Šต์„œ์—์„œ๋Š” ํ‰๊ฐ€ํŒ ๊ณ„์ •์— ํฌํ•จ๋œ COMPUTE_WH ์›จ์–ดํ•˜์šฐ์Šค๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

    • ๋ฃจํŠธ ์ž‘์—…์˜ ์‹คํ–‰ ์ผ์ •, task1. ์ผ์ •์€ ์ž‘์—…์„ ์ฃผ๊ธฐ์ ์œผ๋กœ ์‹คํ–‰ํ•˜๋Š” ๊ฐ„๊ฒฉ์„ ์ง€์ •ํ•ฉ๋‹ˆ๋‹ค.

    ์ €์žฅ ํ”„๋กœ์‹œ์ €์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ SQL ๋ฐ Python์œผ๋กœ ์ €์žฅ ํ”„๋กœ์‹œ์ € ์ž‘์„ฑํ•˜๊ธฐ ๋ฅผ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

  2. ๋‘ ๊ฐœ์˜ ์ž‘์—…์„ ์ƒ์„ฑํ•˜๋ ค๋ฉด ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์Šคํ‚ค๋งˆ์—์„œ TaskCollection ์˜ค๋ธŒ์ ํŠธ(tasks)๋ฅผ ๊ฒ€์ƒ‰ํ•˜๊ณ  ์ž‘์—… ์ปฌ๋ ‰์…˜์—์„œ .create() ๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

    # create the task in the Snowflake database
    tasks = schema.tasks
    trunc_task = tasks.create(task1, mode=CreateMode.or_replace)
    
    task2.predecessors = [trunc_task.name]
    filter_task = tasks.create(task2, mode=CreateMode.or_replace)
    
    Copy

    ์ด ์ฝ”๋“œ ์˜ˆ์ œ์—์„œ๋Š” task1 ์„ task2 ์˜ ์„ ํ–‰ ์ž‘์—…์œผ๋กœ ์„ค์ •ํ•˜์—ฌ ์ž‘์—…์„ ์—ฐ๊ฒฐํ•˜์—ฌ ์ตœ์†Œ ์ž‘์—… ๊ทธ๋ž˜ํ”„๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

  3. ๋‘ ์ž‘์—…์ด ์ด์ œ ์žˆ๋Š”์ง€ ํ™•์ธํ•˜๋ ค๋ฉด ๋‹ค์Œ ์…€์—์„œ ๋‹ค์Œ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

    taskiter = tasks.iter()
    for t in taskiter:
        print(t.name)
    
    Copy
  4. ์ž‘์—…์„ ์ƒ์„ฑํ•˜๋ฉด ๊ธฐ๋ณธ์ ์œผ๋กœ ์ž‘์—…์ด ์ผ์‹œ ์ค‘๋‹จ๋ฉ๋‹ˆ๋‹ค.

    ์ž‘์—…์„ ์‹œ์ž‘ํ•˜๋ ค๋ฉด ์ž‘์—… ๋ฆฌ์†Œ์Šค ์˜ค๋ธŒ์ ํŠธ์—์„œ .resume() ์„ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

    trunc_task.resume()
    
    Copy
  5. trunc_task ์ž‘์—…์ด ์‹œ์ž‘๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•˜๋ ค๋ฉด ๋‹ค์Œ ์…€์—์„œ ๋‹ค์Œ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

    taskiter = tasks.iter()
    for t in taskiter:
        print("Name: ", t.name, "| State: ", t.state)
    
    Copy

    ์ถœ๋ ฅ์€ ๋‹ค์Œ๊ณผ ์œ ์‚ฌํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

    Name:  TASK_PYTHON_API_FILTER | State:  suspended
    Name:  TASK_PYTHON_API_TRUNC | State:  started
    

    ์ž‘์—… ์ƒํƒœ๋ฅผ ํ™•์ธํ•˜๊ณ  ์‹ถ์„ ๋•Œ๋งˆ๋‹ค ์ด ๋‹จ๊ณ„๋ฅผ ๋ฐ˜๋ณตํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  6. ์ž‘์—… ๋ฆฌ์†Œ์Šค๋ฅผ ์ •๋ฆฌํ•˜๋ ค๋ฉด ๋จผ์ € ์ž‘์—…์„ ์ผ์‹œ ์ค‘๋‹จํ•œ ๋‹ค์Œ ์‚ญ์ œํ•ฉ๋‹ˆ๋‹ค.

    ๋‹ค์Œ ์…€์—์„œ ๋‹ค์Œ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

    trunc_task.suspend()
    
    Copy
  7. ์ž‘์—…์ด ์ผ์‹œ ์ค‘๋‹จ๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•˜๋ ค๋ฉด 5๋‹จ๊ณ„๋ฅผ ๋ฐ˜๋ณตํ•ฉ๋‹ˆ๋‹ค.

  8. ์„ ํƒ ์‚ฌํ•ญ: ๋‘ ์ž‘์—…์„ ๋ชจ๋‘ ์‚ญ์ œํ•˜๋ ค๋ฉด ๋‹ค์Œ ์…€์—์„œ ๋‹ค์Œ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

    trunc_task.drop()
    filter_task.drop()
    
    Copy

์ž‘์—… ๊ทธ๋ž˜ํ”„๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ๊ด€๋ฆฌํ•ฉ๋‹ˆ๋‹ค.ยถ

๋‹ค์ˆ˜์˜ ์ž‘์—… ์‹คํ–‰์„ ์กฐ์ •ํ•  ๋•Œ, ๊ฐ ์ž‘์—…์„ ๊ฐœ๋ณ„์ ์œผ๋กœ ๊ด€๋ฆฌํ•˜๋Š” ๊ฒƒ์€ ์–ด๋ ค์šธ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. Snowflake Python APIs ์€ ์ƒ์œ„ ์ˆ˜์ค€์˜ ์ž‘์—… ๊ทธ๋ž˜ํ”„ API๋กœ ์ž‘์—…์„ ์˜ค์ผ€์ŠคํŠธ๋ ˆ์ด์…˜ํ•˜๋Š” ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

๋ฐฉํ–ฅ์„ฑ ๋น„์ˆœํ™˜ ๊ทธ๋ž˜ํ”„(DAG)๋ผ๊ณ ๋„ ํ•˜๋Š” ์ž‘์—… ๊ทธ๋ž˜ํ”„๋Š” ๋ฃจํŠธ ์ž‘์—…๊ณผ ํ•˜์œ„ ์ž‘์—…์œผ๋กœ ๊ตฌ์„ฑ๋œ ์ผ๋ จ์˜ ์ž‘์—…์œผ๋กœ, ์ข…์†์„ฑ์— ๋”ฐ๋ผ ๊ตฌ์„ฑ๋ฉ๋‹ˆ๋‹ค. ์ž์„ธํ•œ ๋‚ด์šฉ์€ ์ž‘์—… ๊ทธ๋ž˜ํ”„๋กœ ์ž‘์—… ์‹œํ€€์Šค ์ƒ์„ฑํ•˜๊ธฐ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

  1. ์ž‘์—… ๊ทธ๋ž˜ํ”„๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ๋ฐฐํฌํ•˜๋ ค๋ฉด ๋‹ค์Œ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

    dag_name = "python_api_dag"
    dag = DAG(name=dag_name, schedule=timedelta(days=1))
    with dag:
        dag_task1 = DAGTask(
            name="task_python_api_trunc",
            definition=StoredProcedureCall(
                func=trunc,
                stage_location=f"@{tasks_stage}",
                packages=["snowflake-snowpark-python"]),
            warehouse="COMPUTE_WH",
        )
        dag_task2 = DAGTask(
            name="task_python_api_filter",
            definition=StoredProcedureCall(
                func=filter_by_shipmode,
                stage_location=f"@{tasks_stage}",
                packages=["snowflake-snowpark-python"]),
            warehouse="COMPUTE_WH",
        )
        dag_task1 >> dag_task2
    dag_op = DAGOperation(schema)
    dag_op.deploy(dag, mode=CreateMode.or_replace)
    
    Copy

    ์ด ์ฝ”๋“œ์—์„œ๋Š” ๋‹ค์Œ์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

    • DAG ์ƒ์„ฑ์ž๋ฅผ ํ˜ธ์ถœํ•˜๊ณ  ์ด๋ฆ„๊ณผ ์ผ์ •์„ ์ง€์ •ํ•˜์—ฌ ์ž‘์—… ๊ทธ๋ž˜ํ”„ ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

    • DAGTask ์ƒ์„ฑ์ž๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ž‘์—… ๊ทธ๋ž˜ํ”„๋ณ„ ์ž‘์—…์„ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค. ์ƒ์„ฑ์ž๋Š” ์ด์ „ ๋‹จ๊ณ„์—์„œ StoredProcedureCall ํด๋ž˜์Šค์— ์ง€์ •ํ•œ ๊ฒƒ๊ณผ ๋™์ผํ•œ ์ธ์ž๋ฅผ ํ—ˆ์šฉํ•œ๋‹ค๋Š” ์ ์— ์œ ์˜ํ•˜์‹ญ์‹œ์˜ค.

    • ๋” ํŽธ๋ฆฌํ•œ ๊ตฌ๋ฌธ์„ ์‚ฌ์šฉํ•˜์—ฌ dag_task1 ์„ ๋ฃจํŠธ ์ž‘์—…์œผ๋กœ ์ง€์ •ํ•˜๊ณ  dag_task2 ์˜ ์ด์ „ ์ž‘์—…์„ ์ง€์ •ํ•ฉ๋‹ˆ๋‹ค.

    • ์ž‘์—… ๊ทธ๋ž˜ํ”„๋ฅผ PYTHON_API_SCHEMA ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ PYTHON_API_DB ์Šคํ‚ค๋งˆ์— ๋ฐฐํฌํ•ฉ๋‹ˆ๋‹ค.

  2. ์ž‘์—… ๊ทธ๋ž˜ํ”„ ์ƒ์„ฑ์„ ํ™•์ธํ•˜๋ ค๋ฉด ๋‹ค์Œ ์…€์—์„œ ๋‹ค์Œ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

    taskiter = tasks.iter()
    for t in taskiter:
        print("Name: ", t.name, "| State: ", t.state)
    
    Copy

    ์ž‘์—… ์ƒํƒœ๋ฅผ ํ™•์ธํ•˜๊ณ  ์‹ถ์„ ๋•Œ๋งˆ๋‹ค ์ด ๋‹จ๊ณ„๋ฅผ ๋ฐ˜๋ณตํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  3. ๋ฃจํŠธ ์ž‘์—…์„ ์‹œ์ž‘ํ•˜์—ฌ ์ž‘์—… ๊ทธ๋ž˜ํ”„๋ฅผ ์‹œ์ž‘ํ•˜๋ ค๋ฉด ๋‹ค์Œ ์…€์—์„œ ๋‹ค์Œ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

    dag_op.run(dag)
    
    Copy
  4. PYTHON_API_DAG$TASK_PYTHON_API_TRUNC ์ž‘์—…์ด ์‹œ์ž‘๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•˜๋ ค๋ฉด 2๋‹จ๊ณ„๋ฅผ ๋ฐ˜๋ณตํ•ฉ๋‹ˆ๋‹ค.

    ์ฐธ๊ณ 

    ์ž‘์—… ๊ทธ๋ž˜ํ”„์—์„œ ํ˜ธ์ถœํ•œ ํ•จ์ˆ˜ ํ˜ธ์ถœ์€ ํ•„์ˆ˜ ์ธ์ž๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š์•˜๊ธฐ ๋•Œ๋ฌธ์— ์„ฑ๊ณตํ•˜์ง€ ๋ชปํ•ฉ๋‹ˆ๋‹ค. ์ด ๋‹จ๊ณ„์˜ ๋ชฉ์ ์€ ์ž‘์—… ๊ทธ๋ž˜ํ”„๋ฅผ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ๋ฐฉ์‹์œผ๋กœ ์‹œ์ž‘ํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋ณด์—ฌ์ฃผ๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

  5. ์ž‘์—… ๊ทธ๋ž˜ํ”„๋ฅผ ์‚ญ์ œํ•˜๋ ค๋ฉด ๋‹ค์Œ ์…€์—์„œ ๋‹ค์Œ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

    dag_op.drop(dag)
    
    Copy
  6. ์ด ์ž์Šต์„œ์—์„œ ์ƒ์„ฑํ•œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์ •๋ฆฌํ•ฉ๋‹ˆ๋‹ค.

    database.drop()
    
    Copy

๋‹ค์Œ์—๋Š” ๋ฌด์—‡์„ ํ•ด์•ผ ํ•ฉ๋‹ˆ๊นŒ?ยถ

์ถ•ํ•˜ํ•ฉ๋‹ˆ๋‹ค! ์ด ์ž์Šต์„œ์—์„œ๋Š” Snowflake Python APIs ์„ ์‚ฌ์šฉํ•˜์—ฌ ์ž‘์—… ๋ฐ ์ž‘์—… ๊ทธ๋ž˜ํ”„๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ๊ด€๋ฆฌํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋ฐฐ์› ์Šต๋‹ˆ๋‹ค.

์š”์•ฝยถ

์ด ๊ณผ์ •์—์„œ ๋‹ค์Œ ๋‹จ๊ณ„๋ฅผ ์™„๋ฃŒํ–ˆ์Šต๋‹ˆ๋‹ค.

  • ์ €์žฅ ํ”„๋กœ์‹œ์ €์™€ ํ•ด๋‹น ์ข…์†์„ฑ์„ ๋ณด๊ด€ํ•  ์ˆ˜ ์žˆ๋Š” ์Šคํ…Œ์ด์ง€๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

  • ์ž‘์—…์„ ์ƒ์„ฑํ•˜๊ณ  ๊ด€๋ฆฌํ•ฉ๋‹ˆ๋‹ค.

  • ์ž‘์—… ๊ทธ๋ž˜ํ”„๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ๊ด€๋ฆฌํ•ฉ๋‹ˆ๋‹ค.

  • Snowflake ๋ฆฌ์†Œ์Šค ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์‚ญ์ œํ•˜์—ฌ ์ •๋ฆฌํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ ์ž์Šต์„œยถ

์ด์ œ Snowpark Container Services ์—์„œ ์ปดํฌ๋„ŒํŠธ๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ๊ด€๋ฆฌํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋ณด์—ฌ์ฃผ๋Š” ์ž์Šต์„œ 3: Snowpark Container Service ๋งŒ๋“ค๊ธฐ ๋ฐ ๊ด€๋ฆฌ ์œผ๋กœ ์ง„ํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ถ”๊ฐ€ ๋ฆฌ์†Œ์Šคยถ

API๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Snowflake์—์„œ ๋‹ค๋ฅธ ์œ ํ˜•์˜ ์˜ค๋ธŒ์ ํŠธ๋ฅผ ๊ด€๋ฆฌํ•˜๋Š” ๋” ๋งŽ์€ ์˜ˆ์ œ๋Š” ๋‹ค์Œ ๊ฐœ๋ฐœ์ž ๊ฐ€์ด๋“œ๋ฅผ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

๊ฐ€์ด๋“œ

์„ค๋ช…

Python์„ ์‚ฌ์šฉํ•˜์—ฌ Snowflake ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค, ์Šคํ‚ค๋งˆ, ํ…Œ์ด๋ธ” ๋ฐ ๋ทฐ ๊ด€๋ฆฌํ•˜๊ธฐ

๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค, ์Šคํ‚ค๋งˆ, ํ…Œ์ด๋ธ”์„ ์ƒ์„ฑํ•˜๊ณ  ๊ด€๋ฆฌํ•˜๋ ค๋ฉด API๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

Python์„ ์‚ฌ์šฉํ•˜์—ฌ Snowflake ์‚ฌ์šฉ์ž, ์—ญํ•  ๋ฐ ๊ถŒํ•œ ๊ด€๋ฆฌ

API๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์‚ฌ์šฉ์ž, ์—ญํ•  ๋ฐ ๋ณด์กฐ๊ธˆ์„ ์ƒ์„ฑํ•˜๊ณ  ๊ด€๋ฆฌํ•ฉ๋‹ˆ๋‹ค.

Python์„ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ ๋กœ๋”ฉ ๋ฐ ์–ธ๋กœ๋”ฉ ๋ฆฌ์†Œ์Šค ๊ด€๋ฆฌ

API๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์™ธ๋ถ€ ๋ณผ๋ฅจ, ํŒŒ์ดํ”„, ์Šคํ…Œ์ด์ง€๋ฅผ ํฌํ•จํ•œ ๋ฐ์ดํ„ฐ ๋กœ๋”ฉ ๋ฐ ์–ธ๋กœ๋”ฉ ๋ฆฌ์†Œ์Šค๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ๊ด€๋ฆฌํ•ฉ๋‹ˆ๋‹ค.

Python์„ ์‚ฌ์šฉํ•˜์—ฌ Snowpark ์ปจํ…Œ์ด๋„ˆ ์„œ๋น„์Šค(์„œ๋น„์Šค ํ•จ์ˆ˜ ํฌํ•จ) ๊ด€๋ฆฌํ•˜๊ธฐ

API๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ปดํ“จํŒ… ํ’€, ์ด๋ฏธ์ง€ ๋ฆฌํฌ์ง€ํ† ๋ฆฌ, ์„œ๋น„์Šค, ์„œ๋น„์Šค ํ•จ์ˆ˜ ๋“ฑ Snowpark Container Services์˜ ๊ตฌ์„ฑ ์š”์†Œ๋ฅผ ๊ด€๋ฆฌํ•ฉ๋‹ˆ๋‹ค.