ffmpeg -y -i "https://knowledge-base-crmone.s3.amazonaws.com/final_videos/1769751447_contact_test_video_spanish_1769751988095.mp4?AWSAccessKeyId=AKIAYE24AMHCYVHMJYVD&Signature=G1v4jW80gmzDu9thJbULbAGooyk%3D&Expires=1770356801"
-vf "scale=eval=frame:w=trunc(iw*(1+(1.5-1)*(between(t,15,16)*((t-15)/(16-15))+between(t,16,36)+between(t,36,37)*(1-((t-36)/(37-36))))/2)*2):h=trunc(ih*(1+(1.5-1)*(between(t,15,16)*((t-15)/(16-15))+between(t,16,36)+between(t,36,37)*(1-((t-36)/(37-36))))/2)*2),crop=iw:ih:(in_w-out_w)*0.69:(in_h-out_h)*0.55"
-c:v libx264 -pix_fmt yuv420p -preset slow -crf 18 -c:a copy output_69_55.mp4
I use this command to zoom. Zoom is working perfectly but position of the zoom is not working.
Below is my Python function:
import os
import subprocess
import requests
import tempfile
import time
from video_pipeline.tools.s3_uploader import upload_file_to_s3
from video_pipeline.helper.file_naming import generate_unique_output_filename
# def edit_generated_video(
# id: str,
# input_url: str,
# zoom_in_start: float,
# zoom_in_end: float,
# hold_start: float,
# hold_end: float,
# zoom_out_start: float,
# zoom_out_end: float,
# zoom_x: float,
# zoom_y: float,
# max_zoom: float = 1.5,
# ):
# temp_input_path = None
# temp_output_path = None
# try:
# # Download input video
# with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_input:
# response = requests.get(input_url, stream=True)
# response.raise_for_status()
# for chunk in response.iter_content(chunk_size=8192):
# temp_input.write(chunk)
# temp_input_path = temp_input.name
# # Prepare temp output
# with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_output:
# temp_output_path = temp_output.name
# # zoom_expr = (
# # f"(1+({max_zoom}-1)*("
# # f"between(t\,{zoom_in_start}\,{zoom_in_end})"
# # f"* ((t-{zoom_in_start})/({zoom_in_end}-{zoom_in_start}))"
# # f"+ between(t\,{hold_start}\,{hold_end})"
# # f"+ between(t\,{zoom_out_start}\,{zoom_out_end})"
# # f"* (1-((t-{zoom_out_start})/({zoom_out_end}-{zoom_out_start})))"
# # f"))"
# # )
# # vf_filter = (
# # f"crop="
# # f"w=iw/{zoom_expr}:"
# # f"h=ih/{zoom_expr}:"
# # f"x=(iw-out_w)*{zoom_x}:"
# # f"y=(ih-out_h)*{zoom_y},"
# # f"scale=iw:ih"
# # )
# # cmd = [
# # "ffmpeg", "-y",
# # "-i", temp_input_path,
# # "-vf", vf_filter,
# # "-c:v", "libx264",
# # "-pix_fmt", "yuv420p",
# # "-preset", "slow",
# # "-crf", "18",
# # "-c:a", "copy",
# # temp_output_path
# # ]
# # 1. Define the zoom level expression using your variables
# # We calculate the zoom factor (1 to 1.5) based on time
# zoom_expr = (
# f"(1 + ({max_zoom} - 1) * ("
# f"between(t, {zoom_in_start}, {zoom_in_end}) * ((t - {zoom_in_start}) / ({zoom_in_end} - {zoom_in_start})) + "
# f"between(t, {hold_start}, {hold_end}) + "
# f"between(t, {zoom_out_start}, {zoom_out_end}) * (1 - ((t - {zoom_out_start}) / ({zoom_out_end} - {zoom_out_start})))"
# f"))"
# )
# # 2. Build the Video Filter (vf) string
# # We use the scale-then-crop method you found successful
# vf_filter = (
# f"scale=eval=frame:"
# f"w=trunc(iw*{zoom_expr}/2)*2:"
# f"h=trunc(ih*{zoom_expr}/2)*2,"
# f"crop=iw:ih:"
# f"(in_w-out_w)*{zoom_x/100}:" # Dividing by 100 assuming zoom_x is a percentage (0-100)
# f"(in_h-out_h)*{zoom_y/100}"
# )
# cmd = [
# "ffmpeg", "-y",
# "-i", temp_input_path,
# "-vf", vf_filter,
# "-c:v", "libx264",
# "-pix_fmt", "yuv420p",
# "-preset", "slow",
# "-crf", "18",
# "-c:a", "copy",
# temp_output_path
# ]
# subprocess.run(cmd, check=True)
# # Upload to S3
# s3_key = f"{id}/generated_video_edited.mp4"
# s3_url = upload_file_to_s3(file_path=temp_output_path, s3_key=s3_key)
# print(f"Edited video uploaded to S3: {s3_url}")
# return s3_url
# except Exception as e:
# print(f"Error in edit_generated_video: {e}")
# return None
# finally:
# # Cleanup
# if temp_input_path and os.path.exists(temp_input_path):
# os.remove(temp_input_path)
# if temp_output_path and os.path.exists(temp_output_path):
# os.remove(temp_output_path)
def edit_generated_video(
id: str,
lang: str,
input_url: str,
zoom_in_start: float,
zoom_in_end: float,
hold_start: float,
hold_end: float,
zoom_out_start: float,
zoom_out_end: float,
zoom_x: float,
zoom_y: float,
max_zoom: float = 1.3,
):
temp_input_path = None
temp_output_path = None
try:
# Download input video
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_input:
response = requests.get(input_url, stream=True)
response.raise_for_status()
for chunk in response.iter_content(chunk_size=8192):
temp_input.write(chunk)
temp_input_path = temp_input.name
# Prepare temp output
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_output:
temp_output_path = temp_output.name
# --- THE FIX STARTS HERE ---
# We define a variable for the escaped comma to make the f-string readable.
# In Python strings, "\" becomes a single "" which FFmpeg needs to escape the comma.
c = "\,"
# Construct the Zoom Expression
# We replace normal commas with our escaped 'c' variable
zoom_expr = (
f"(1+({max_zoom}-1)*("
f"between(t{c}{zoom_in_start}{c}{zoom_in_end})*"
f"((t-{zoom_in_start})/({zoom_in_end}-{zoom_in_start}))+"
f"between(t{c}{hold_start}{c}{hold_end})+"
f"between(t{c}{zoom_out_start}{c}{zoom_out_end})*"
f"(1-((t-{zoom_out_start})/({zoom_out_end}-{zoom_out_start})))"
f"))"
)
# Construct the Filter String
vf_filter = (
f"scale=eval=frame:"
f"w=trunc(iw*{zoom_expr}/2)*2:"
f"h=trunc(ih*{zoom_expr}/2)*2,"
f"crop=iw:ih:"
f"(in_w-out_w)*{zoom_x/100}:" # Assuming zoom_x is 0-100 (e.g. 69.0)
f"(in_h-out_h)*{zoom_y/100}" # Assuming zoom_y is 0-100 (e.g. 55.0)
)
# ---------------------------
cmd = [
"ffmpeg", "-y",
"-i", temp_input_path,
"-vf", vf_filter,
"-c:v", "libx264",
"-pix_fmt", "yuv420p",
"-preset", "slow",
"-crf", "18",
"-c:a", "copy",
temp_output_path
]
# Debug: Print the command
print(f"Executing: {' '.join(cmd)}")
subprocess.run(cmd, check=True)
final_video_name = generate_unique_output_filename(
original_video_path=temp_output_path,
language=lang,
prefix="video",
extension="mp4",
)
# Upload to S3
s3_key = f"edited_video/{os.path.basename(final_video_name)}"
s3_url = upload_file_to_s3(file_path=temp_output_path, s3_key=s3_key)
print(f"Edited video uploaded to S3: {s3_url}")
return {"s3_key" : s3_key, "download_url" : s3_url}
except subprocess.CalledProcessError as e:
print(f"FFmpeg failed with error code {e.returncode}")
return None
except Exception as e:
print(f"Error in edit_generated_video: {e}")
return None
finally:
if temp_input_path and os.path.exists(temp_input_path):
os.remove(temp_input_path)
if temp_output_path and os.path.exists(temp_output_path):
os.remove(temp_output_path)