# S3 bucket and key for the audio file
s3 = boto3.client('s3')
# RunPog API configuration
presigned_url = s3.generate_presigned_url(
'get_object',
Params={'Bucket': BUCKET_NAME, 'Key': av_path},
ExpiresIn=3600 # valid for 1 hour
)
print(presigned_url)
payload = {
"input": {
"audio_base64": presigned_url,
"model": "large-v3",
"language": "en",
"beam_size": 5,
"vad_filter": True
}
}
headers = {
"Authorization": f"Bearer {RUNPOD_API_KEY}",
"Content-Type": "application/json"
}
start_time = time.time()
response = requests.post(
f"https://api.runpod.ai/v2/{ENDPOINT_ID}/run",
json=payload,
headers=headers
)
job = response.json()
print(f"Response data: {job}")
job_id = job.get("id")
print(f"Submitted job: {job_id}")
# Step 2: Poll until complete
status_url = f"https://api.runpod.ai/v2/{ENDPOINT_ID}/status/{job_id}"
while True:
time.sleep(3)
status_response = requests.get(status_url, headers=headers)
status = status_response.json()
if status["status"] == "COMPLETED":
break
elif status["status"] == "FAILED":
raise Exception("Transcription job failed.")
else:
print(f"Job status: {status}")
# print(f"Job status: {status['status']}")
# S3 bucket and key for the audio file
s3 = boto3.client('s3')
# RunPog API configuration
presigned_url = s3.generate_presigned_url(
'get_object',
Params={'Bucket': BUCKET_NAME, 'Key': av_path},
ExpiresIn=3600 # valid for 1 hour
)
print(presigned_url)
payload = {
"input": {
"audio_base64": presigned_url,
"model": "large-v3",
"language": "en",
"beam_size": 5,
"vad_filter": True
}
}
headers = {
"Authorization": f"Bearer {RUNPOD_API_KEY}",
"Content-Type": "application/json"
}
start_time = time.time()
response = requests.post(
f"https://api.runpod.ai/v2/{ENDPOINT_ID}/run",
json=payload,
headers=headers
)
job = response.json()
print(f"Response data: {job}")
job_id = job.get("id")
print(f"Submitted job: {job_id}")
# Step 2: Poll until complete
status_url = f"https://api.runpod.ai/v2/{ENDPOINT_ID}/status/{job_id}"
while True:
time.sleep(3)
status_response = requests.get(status_url, headers=headers)
status = status_response.json()
if status["status"] == "COMPLETED":
break
elif status["status"] == "FAILED":
raise Exception("Transcription job failed.")
else:
print(f"Job status: {status}")
# print(f"Job status: {status['status']}")