# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Code generated by the Google Gen AI SDK generator DO NOT EDIT.

import json
import logging
from typing import Any, Optional, Union
from urllib.parse import urlencode

from . import _api_module
from . import _common
from . import _extra_utils
from . import _transformers as t
from . import types
from ._api_client import BaseApiClient
from ._common import get_value_by_path as getv
from ._common import move_value_by_path as movev
from ._common import set_value_by_path as setv
from .pagers import AsyncPager, Pager


logger = logging.getLogger('google_genai.batches')


def _BatchJobDestination_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['responsesFile']) is not None:
    setv(to_object, ['file_name'], getv(from_object, ['responsesFile']))

  if getv(from_object, ['inlinedResponses', 'inlinedResponses']) is not None:
    setv(
        to_object,
        ['inlined_responses'],
        [
            _InlinedResponse_from_mldev(item, to_object)
            for item in getv(
                from_object, ['inlinedResponses', 'inlinedResponses']
            )
        ],
    )

  if (
      getv(from_object, ['inlinedEmbedContentResponses', 'inlinedResponses'])
      is not None
  ):
    setv(
        to_object,
        ['inlined_embed_content_responses'],
        [
            item
            for item in getv(
                from_object,
                ['inlinedEmbedContentResponses', 'inlinedResponses'],
            )
        ],
    )

  return to_object


def _BatchJobDestination_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['predictionsFormat']) is not None:
    setv(to_object, ['format'], getv(from_object, ['predictionsFormat']))

  if getv(from_object, ['gcsDestination', 'outputUriPrefix']) is not None:
    setv(
        to_object,
        ['gcs_uri'],
        getv(from_object, ['gcsDestination', 'outputUriPrefix']),
    )

  if getv(from_object, ['bigqueryDestination', 'outputUri']) is not None:
    setv(
        to_object,
        ['bigquery_uri'],
        getv(from_object, ['bigqueryDestination', 'outputUri']),
    )

  return to_object


def _BatchJobDestination_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['format']) is not None:
    setv(to_object, ['predictionsFormat'], getv(from_object, ['format']))

  if getv(from_object, ['gcs_uri']) is not None:
    setv(
        to_object,
        ['gcsDestination', 'outputUriPrefix'],
        getv(from_object, ['gcs_uri']),
    )

  if getv(from_object, ['bigquery_uri']) is not None:
    setv(
        to_object,
        ['bigqueryDestination', 'outputUri'],
        getv(from_object, ['bigquery_uri']),
    )

  if getv(from_object, ['file_name']) is not None:
    raise ValueError('file_name parameter is not supported in Vertex AI.')

  if getv(from_object, ['inlined_responses']) is not None:
    raise ValueError(
        'inlined_responses parameter is not supported in Vertex AI.'
    )

  if getv(from_object, ['inlined_embed_content_responses']) is not None:
    raise ValueError(
        'inlined_embed_content_responses parameter is not supported in'
        ' Vertex AI.'
    )

  return to_object


def _BatchJobSource_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['instancesFormat']) is not None:
    setv(to_object, ['format'], getv(from_object, ['instancesFormat']))

  if getv(from_object, ['gcsSource', 'uris']) is not None:
    setv(to_object, ['gcs_uri'], getv(from_object, ['gcsSource', 'uris']))

  if getv(from_object, ['bigquerySource', 'inputUri']) is not None:
    setv(
        to_object,
        ['bigquery_uri'],
        getv(from_object, ['bigquerySource', 'inputUri']),
    )

  return to_object


def _BatchJobSource_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['format']) is not None:
    raise ValueError('format parameter is not supported in Gemini API.')

  if getv(from_object, ['gcs_uri']) is not None:
    raise ValueError('gcs_uri parameter is not supported in Gemini API.')

  if getv(from_object, ['bigquery_uri']) is not None:
    raise ValueError('bigquery_uri parameter is not supported in Gemini API.')

  if getv(from_object, ['file_name']) is not None:
    setv(to_object, ['fileName'], getv(from_object, ['file_name']))

  if getv(from_object, ['inlined_requests']) is not None:
    setv(
        to_object,
        ['requests', 'requests'],
        [
            _InlinedRequest_to_mldev(api_client, item, to_object)
            for item in getv(from_object, ['inlined_requests'])
        ],
    )

  return to_object


def _BatchJobSource_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['format']) is not None:
    setv(to_object, ['instancesFormat'], getv(from_object, ['format']))

  if getv(from_object, ['gcs_uri']) is not None:
    setv(to_object, ['gcsSource', 'uris'], getv(from_object, ['gcs_uri']))

  if getv(from_object, ['bigquery_uri']) is not None:
    setv(
        to_object,
        ['bigquerySource', 'inputUri'],
        getv(from_object, ['bigquery_uri']),
    )

  if getv(from_object, ['file_name']) is not None:
    raise ValueError('file_name parameter is not supported in Vertex AI.')

  if getv(from_object, ['inlined_requests']) is not None:
    raise ValueError(
        'inlined_requests parameter is not supported in Vertex AI.'
    )

  return to_object


def _BatchJob_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['name']) is not None:
    setv(to_object, ['name'], getv(from_object, ['name']))

  if getv(from_object, ['metadata', 'displayName']) is not None:
    setv(
        to_object,
        ['display_name'],
        getv(from_object, ['metadata', 'displayName']),
    )

  if getv(from_object, ['metadata', 'state']) is not None:
    setv(
        to_object,
        ['state'],
        t.t_job_state(getv(from_object, ['metadata', 'state'])),
    )

  if getv(from_object, ['metadata', 'createTime']) is not None:
    setv(
        to_object,
        ['create_time'],
        getv(from_object, ['metadata', 'createTime']),
    )

  if getv(from_object, ['metadata', 'endTime']) is not None:
    setv(to_object, ['end_time'], getv(from_object, ['metadata', 'endTime']))

  if getv(from_object, ['metadata', 'updateTime']) is not None:
    setv(
        to_object,
        ['update_time'],
        getv(from_object, ['metadata', 'updateTime']),
    )

  if getv(from_object, ['metadata', 'model']) is not None:
    setv(to_object, ['model'], getv(from_object, ['metadata', 'model']))

  if getv(from_object, ['metadata', 'output']) is not None:
    setv(
        to_object,
        ['dest'],
        _BatchJobDestination_from_mldev(
            t.t_recv_batch_job_destination(
                getv(from_object, ['metadata', 'output'])
            ),
            to_object,
        ),
    )

  return to_object


def _BatchJob_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['name']) is not None:
    setv(to_object, ['name'], getv(from_object, ['name']))

  if getv(from_object, ['displayName']) is not None:
    setv(to_object, ['display_name'], getv(from_object, ['displayName']))

  if getv(from_object, ['state']) is not None:
    setv(to_object, ['state'], t.t_job_state(getv(from_object, ['state'])))

  if getv(from_object, ['error']) is not None:
    setv(to_object, ['error'], getv(from_object, ['error']))

  if getv(from_object, ['createTime']) is not None:
    setv(to_object, ['create_time'], getv(from_object, ['createTime']))

  if getv(from_object, ['startTime']) is not None:
    setv(to_object, ['start_time'], getv(from_object, ['startTime']))

  if getv(from_object, ['endTime']) is not None:
    setv(to_object, ['end_time'], getv(from_object, ['endTime']))

  if getv(from_object, ['updateTime']) is not None:
    setv(to_object, ['update_time'], getv(from_object, ['updateTime']))

  if getv(from_object, ['model']) is not None:
    setv(to_object, ['model'], getv(from_object, ['model']))

  if getv(from_object, ['inputConfig']) is not None:
    setv(
        to_object,
        ['src'],
        _BatchJobSource_from_vertex(
            getv(from_object, ['inputConfig']), to_object
        ),
    )

  if getv(from_object, ['outputConfig']) is not None:
    setv(
        to_object,
        ['dest'],
        _BatchJobDestination_from_vertex(
            t.t_recv_batch_job_destination(getv(from_object, ['outputConfig'])),
            to_object,
        ),
    )

  if getv(from_object, ['completionStats']) is not None:
    setv(
        to_object, ['completion_stats'], getv(from_object, ['completionStats'])
    )

  return to_object


def _Blob_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['data']) is not None:
    setv(to_object, ['data'], getv(from_object, ['data']))

  if getv(from_object, ['display_name']) is not None:
    raise ValueError('display_name parameter is not supported in Gemini API.')

  if getv(from_object, ['mime_type']) is not None:
    setv(to_object, ['mimeType'], getv(from_object, ['mime_type']))

  return to_object


def _CancelBatchJobParameters_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['name']) is not None:
    setv(
        to_object,
        ['_url', 'name'],
        t.t_batch_job_name(api_client, getv(from_object, ['name'])),
    )

  return to_object


def _CancelBatchJobParameters_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['name']) is not None:
    setv(
        to_object,
        ['_url', 'name'],
        t.t_batch_job_name(api_client, getv(from_object, ['name'])),
    )

  return to_object


def _Candidate_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['content']) is not None:
    setv(to_object, ['content'], getv(from_object, ['content']))

  if getv(from_object, ['citationMetadata']) is not None:
    setv(
        to_object,
        ['citation_metadata'],
        _CitationMetadata_from_mldev(
            getv(from_object, ['citationMetadata']), to_object
        ),
    )

  if getv(from_object, ['tokenCount']) is not None:
    setv(to_object, ['token_count'], getv(from_object, ['tokenCount']))

  if getv(from_object, ['finishReason']) is not None:
    setv(to_object, ['finish_reason'], getv(from_object, ['finishReason']))

  if getv(from_object, ['avgLogprobs']) is not None:
    setv(to_object, ['avg_logprobs'], getv(from_object, ['avgLogprobs']))

  if getv(from_object, ['groundingMetadata']) is not None:
    setv(
        to_object,
        ['grounding_metadata'],
        getv(from_object, ['groundingMetadata']),
    )

  if getv(from_object, ['index']) is not None:
    setv(to_object, ['index'], getv(from_object, ['index']))

  if getv(from_object, ['logprobsResult']) is not None:
    setv(to_object, ['logprobs_result'], getv(from_object, ['logprobsResult']))

  if getv(from_object, ['safetyRatings']) is not None:
    setv(
        to_object,
        ['safety_ratings'],
        [item for item in getv(from_object, ['safetyRatings'])],
    )

  if getv(from_object, ['urlContextMetadata']) is not None:
    setv(
        to_object,
        ['url_context_metadata'],
        getv(from_object, ['urlContextMetadata']),
    )

  return to_object


def _CitationMetadata_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['citationSources']) is not None:
    setv(
        to_object,
        ['citations'],
        [item for item in getv(from_object, ['citationSources'])],
    )

  return to_object


def _Content_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['parts']) is not None:
    setv(
        to_object,
        ['parts'],
        [
            _Part_to_mldev(item, to_object)
            for item in getv(from_object, ['parts'])
        ],
    )

  if getv(from_object, ['role']) is not None:
    setv(to_object, ['role'], getv(from_object, ['role']))

  return to_object


def _CreateBatchJobConfig_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['display_name']) is not None:
    setv(
        parent_object,
        ['batch', 'displayName'],
        getv(from_object, ['display_name']),
    )

  if getv(from_object, ['dest']) is not None:
    raise ValueError('dest parameter is not supported in Gemini API.')

  return to_object


def _CreateBatchJobConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['display_name']) is not None:
    setv(parent_object, ['displayName'], getv(from_object, ['display_name']))

  if getv(from_object, ['dest']) is not None:
    setv(
        parent_object,
        ['outputConfig'],
        _BatchJobDestination_to_vertex(
            t.t_batch_job_destination(getv(from_object, ['dest'])), to_object
        ),
    )

  return to_object


def _CreateBatchJobParameters_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['src']) is not None:
    setv(
        to_object,
        ['batch', 'inputConfig'],
        _BatchJobSource_to_mldev(
            api_client,
            t.t_batch_job_source(api_client, getv(from_object, ['src'])),
            to_object,
        ),
    )

  if getv(from_object, ['config']) is not None:
    _CreateBatchJobConfig_to_mldev(getv(from_object, ['config']), to_object)

  return to_object


def _CreateBatchJobParameters_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['src']) is not None:
    setv(
        to_object,
        ['inputConfig'],
        _BatchJobSource_to_vertex(
            t.t_batch_job_source(api_client, getv(from_object, ['src'])),
            to_object,
        ),
    )

  if getv(from_object, ['config']) is not None:
    _CreateBatchJobConfig_to_vertex(getv(from_object, ['config']), to_object)

  return to_object


def _CreateEmbeddingsBatchJobConfig_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['display_name']) is not None:
    setv(
        parent_object,
        ['batch', 'displayName'],
        getv(from_object, ['display_name']),
    )

  return to_object


def _CreateEmbeddingsBatchJobParameters_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['src']) is not None:
    setv(
        to_object,
        ['batch', 'inputConfig'],
        _EmbeddingsBatchJobSource_to_mldev(
            api_client, getv(from_object, ['src']), to_object
        ),
    )

  if getv(from_object, ['config']) is not None:
    _CreateEmbeddingsBatchJobConfig_to_mldev(
        getv(from_object, ['config']), to_object
    )

  return to_object


def _DeleteBatchJobParameters_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['name']) is not None:
    setv(
        to_object,
        ['_url', 'name'],
        t.t_batch_job_name(api_client, getv(from_object, ['name'])),
    )

  return to_object


def _DeleteBatchJobParameters_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['name']) is not None:
    setv(
        to_object,
        ['_url', 'name'],
        t.t_batch_job_name(api_client, getv(from_object, ['name'])),
    )

  return to_object


def _DeleteResourceJob_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['name']) is not None:
    setv(to_object, ['name'], getv(from_object, ['name']))

  if getv(from_object, ['done']) is not None:
    setv(to_object, ['done'], getv(from_object, ['done']))

  if getv(from_object, ['error']) is not None:
    setv(to_object, ['error'], getv(from_object, ['error']))

  return to_object


def _DeleteResourceJob_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['name']) is not None:
    setv(to_object, ['name'], getv(from_object, ['name']))

  if getv(from_object, ['done']) is not None:
    setv(to_object, ['done'], getv(from_object, ['done']))

  if getv(from_object, ['error']) is not None:
    setv(to_object, ['error'], getv(from_object, ['error']))

  return to_object


def _EmbedContentBatch_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['contents']) is not None:
    setv(
        to_object,
        ['requests[]', 'request', 'content'],
        [
            item
            for item in t.t_contents_for_embed(
                api_client, getv(from_object, ['contents'])
            )
        ],
    )

  if getv(from_object, ['config']) is not None:
    setv(
        to_object,
        ['_self'],
        _EmbedContentConfig_to_mldev(getv(from_object, ['config']), to_object),
    )
    movev(to_object, {'requests[].*': 'requests[].request.*'})
  return to_object


def _EmbedContentConfig_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['task_type']) is not None:
    setv(
        parent_object,
        ['requests[]', 'taskType'],
        getv(from_object, ['task_type']),
    )

  if getv(from_object, ['title']) is not None:
    setv(parent_object, ['requests[]', 'title'], getv(from_object, ['title']))

  if getv(from_object, ['output_dimensionality']) is not None:
    setv(
        parent_object,
        ['requests[]', 'outputDimensionality'],
        getv(from_object, ['output_dimensionality']),
    )

  if getv(from_object, ['mime_type']) is not None:
    raise ValueError('mime_type parameter is not supported in Gemini API.')

  if getv(from_object, ['auto_truncate']) is not None:
    raise ValueError('auto_truncate parameter is not supported in Gemini API.')

  return to_object


def _EmbeddingsBatchJobSource_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['file_name']) is not None:
    setv(to_object, ['file_name'], getv(from_object, ['file_name']))

  if getv(from_object, ['inlined_requests']) is not None:
    setv(
        to_object,
        ['requests'],
        _EmbedContentBatch_to_mldev(
            api_client, getv(from_object, ['inlined_requests']), to_object
        ),
    )

  return to_object


def _FileData_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['display_name']) is not None:
    raise ValueError('display_name parameter is not supported in Gemini API.')

  if getv(from_object, ['file_uri']) is not None:
    setv(to_object, ['fileUri'], getv(from_object, ['file_uri']))

  if getv(from_object, ['mime_type']) is not None:
    setv(to_object, ['mimeType'], getv(from_object, ['mime_type']))

  return to_object


def _GenerateContentConfig_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['system_instruction']) is not None:
    setv(
        parent_object,
        ['systemInstruction'],
        _Content_to_mldev(
            t.t_content(getv(from_object, ['system_instruction'])), to_object
        ),
    )

  if getv(from_object, ['temperature']) is not None:
    setv(to_object, ['temperature'], getv(from_object, ['temperature']))

  if getv(from_object, ['top_p']) is not None:
    setv(to_object, ['topP'], getv(from_object, ['top_p']))

  if getv(from_object, ['top_k']) is not None:
    setv(to_object, ['topK'], getv(from_object, ['top_k']))

  if getv(from_object, ['candidate_count']) is not None:
    setv(to_object, ['candidateCount'], getv(from_object, ['candidate_count']))

  if getv(from_object, ['max_output_tokens']) is not None:
    setv(
        to_object, ['maxOutputTokens'], getv(from_object, ['max_output_tokens'])
    )

  if getv(from_object, ['stop_sequences']) is not None:
    setv(to_object, ['stopSequences'], getv(from_object, ['stop_sequences']))

  if getv(from_object, ['response_logprobs']) is not None:
    setv(
        to_object,
        ['responseLogprobs'],
        getv(from_object, ['response_logprobs']),
    )

  if getv(from_object, ['logprobs']) is not None:
    setv(to_object, ['logprobs'], getv(from_object, ['logprobs']))

  if getv(from_object, ['presence_penalty']) is not None:
    setv(
        to_object, ['presencePenalty'], getv(from_object, ['presence_penalty'])
    )

  if getv(from_object, ['frequency_penalty']) is not None:
    setv(
        to_object,
        ['frequencyPenalty'],
        getv(from_object, ['frequency_penalty']),
    )

  if getv(from_object, ['seed']) is not None:
    setv(to_object, ['seed'], getv(from_object, ['seed']))

  if getv(from_object, ['response_mime_type']) is not None:
    setv(
        to_object,
        ['responseMimeType'],
        getv(from_object, ['response_mime_type']),
    )

  if getv(from_object, ['response_schema']) is not None:
    setv(
        to_object,
        ['responseSchema'],
        t.t_schema(api_client, getv(from_object, ['response_schema'])),
    )

  if getv(from_object, ['response_json_schema']) is not None:
    setv(
        to_object,
        ['responseJsonSchema'],
        getv(from_object, ['response_json_schema']),
    )

  if getv(from_object, ['routing_config']) is not None:
    raise ValueError('routing_config parameter is not supported in Gemini API.')

  if getv(from_object, ['model_selection_config']) is not None:
    raise ValueError(
        'model_selection_config parameter is not supported in Gemini API.'
    )

  if getv(from_object, ['safety_settings']) is not None:
    setv(
        parent_object,
        ['safetySettings'],
        [
            _SafetySetting_to_mldev(item, to_object)
            for item in getv(from_object, ['safety_settings'])
        ],
    )

  if getv(from_object, ['tools']) is not None:
    setv(
        parent_object,
        ['tools'],
        [
            _Tool_to_mldev(t.t_tool(api_client, item), to_object)
            for item in t.t_tools(api_client, getv(from_object, ['tools']))
        ],
    )

  if getv(from_object, ['tool_config']) is not None:
    setv(parent_object, ['toolConfig'], getv(from_object, ['tool_config']))

  if getv(from_object, ['labels']) is not None:
    raise ValueError('labels parameter is not supported in Gemini API.')

  if getv(from_object, ['cached_content']) is not None:
    setv(
        parent_object,
        ['cachedContent'],
        t.t_cached_content_name(
            api_client, getv(from_object, ['cached_content'])
        ),
    )

  if getv(from_object, ['response_modalities']) is not None:
    setv(
        to_object,
        ['responseModalities'],
        getv(from_object, ['response_modalities']),
    )

  if getv(from_object, ['media_resolution']) is not None:
    setv(
        to_object, ['mediaResolution'], getv(from_object, ['media_resolution'])
    )

  if getv(from_object, ['speech_config']) is not None:
    setv(
        to_object,
        ['speechConfig'],
        t.t_speech_config(getv(from_object, ['speech_config'])),
    )

  if getv(from_object, ['audio_timestamp']) is not None:
    raise ValueError(
        'audio_timestamp parameter is not supported in Gemini API.'
    )

  if getv(from_object, ['thinking_config']) is not None:
    setv(to_object, ['thinkingConfig'], getv(from_object, ['thinking_config']))

  if getv(from_object, ['image_config']) is not None:
    setv(to_object, ['imageConfig'], getv(from_object, ['image_config']))

  return to_object


def _GenerateContentResponse_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['candidates']) is not None:
    setv(
        to_object,
        ['candidates'],
        [
            _Candidate_from_mldev(item, to_object)
            for item in getv(from_object, ['candidates'])
        ],
    )

  if getv(from_object, ['modelVersion']) is not None:
    setv(to_object, ['model_version'], getv(from_object, ['modelVersion']))

  if getv(from_object, ['promptFeedback']) is not None:
    setv(to_object, ['prompt_feedback'], getv(from_object, ['promptFeedback']))

  if getv(from_object, ['responseId']) is not None:
    setv(to_object, ['response_id'], getv(from_object, ['responseId']))

  if getv(from_object, ['usageMetadata']) is not None:
    setv(to_object, ['usage_metadata'], getv(from_object, ['usageMetadata']))

  return to_object


def _GetBatchJobParameters_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['name']) is not None:
    setv(
        to_object,
        ['_url', 'name'],
        t.t_batch_job_name(api_client, getv(from_object, ['name'])),
    )

  return to_object


def _GetBatchJobParameters_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['name']) is not None:
    setv(
        to_object,
        ['_url', 'name'],
        t.t_batch_job_name(api_client, getv(from_object, ['name'])),
    )

  return to_object


def _GoogleMaps_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['auth_config']) is not None:
    raise ValueError('auth_config parameter is not supported in Gemini API.')

  if getv(from_object, ['enable_widget']) is not None:
    setv(to_object, ['enableWidget'], getv(from_object, ['enable_widget']))

  return to_object


def _GoogleSearch_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['exclude_domains']) is not None:
    raise ValueError(
        'exclude_domains parameter is not supported in Gemini API.'
    )

  if getv(from_object, ['blocking_confidence']) is not None:
    raise ValueError(
        'blocking_confidence parameter is not supported in Gemini API.'
    )

  if getv(from_object, ['time_range_filter']) is not None:
    setv(
        to_object, ['timeRangeFilter'], getv(from_object, ['time_range_filter'])
    )

  return to_object


def _InlinedRequest_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['request', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['contents']) is not None:
    setv(
        to_object,
        ['request', 'contents'],
        [
            _Content_to_mldev(item, to_object)
            for item in t.t_contents(getv(from_object, ['contents']))
        ],
    )

  if getv(from_object, ['metadata']) is not None:
    setv(to_object, ['metadata'], getv(from_object, ['metadata']))

  if getv(from_object, ['config']) is not None:
    setv(
        to_object,
        ['request', 'generationConfig'],
        _GenerateContentConfig_to_mldev(
            api_client,
            getv(from_object, ['config']),
            getv(to_object, ['request'], default_value={}),
        ),
    )

  return to_object


def _InlinedResponse_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['response']) is not None:
    setv(
        to_object,
        ['response'],
        _GenerateContentResponse_from_mldev(
            getv(from_object, ['response']), to_object
        ),
    )

  if getv(from_object, ['error']) is not None:
    setv(to_object, ['error'], getv(from_object, ['error']))

  return to_object


def _ListBatchJobsConfig_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['page_size']) is not None:
    setv(
        parent_object, ['_query', 'pageSize'], getv(from_object, ['page_size'])
    )

  if getv(from_object, ['page_token']) is not None:
    setv(
        parent_object,
        ['_query', 'pageToken'],
        getv(from_object, ['page_token']),
    )

  if getv(from_object, ['filter']) is not None:
    raise ValueError('filter parameter is not supported in Gemini API.')

  return to_object


def _ListBatchJobsConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['page_size']) is not None:
    setv(
        parent_object, ['_query', 'pageSize'], getv(from_object, ['page_size'])
    )

  if getv(from_object, ['page_token']) is not None:
    setv(
        parent_object,
        ['_query', 'pageToken'],
        getv(from_object, ['page_token']),
    )

  if getv(from_object, ['filter']) is not None:
    setv(parent_object, ['_query', 'filter'], getv(from_object, ['filter']))

  return to_object


def _ListBatchJobsParameters_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['config']) is not None:
    _ListBatchJobsConfig_to_mldev(getv(from_object, ['config']), to_object)

  return to_object


def _ListBatchJobsParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['config']) is not None:
    _ListBatchJobsConfig_to_vertex(getv(from_object, ['config']), to_object)

  return to_object


def _ListBatchJobsResponse_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['nextPageToken']) is not None:
    setv(to_object, ['next_page_token'], getv(from_object, ['nextPageToken']))

  if getv(from_object, ['operations']) is not None:
    setv(
        to_object,
        ['batch_jobs'],
        [
            _BatchJob_from_mldev(item, to_object)
            for item in getv(from_object, ['operations'])
        ],
    )

  return to_object


def _ListBatchJobsResponse_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['nextPageToken']) is not None:
    setv(to_object, ['next_page_token'], getv(from_object, ['nextPageToken']))

  if getv(from_object, ['batchPredictionJobs']) is not None:
    setv(
        to_object,
        ['batch_jobs'],
        [
            _BatchJob_from_vertex(item, to_object)
            for item in getv(from_object, ['batchPredictionJobs'])
        ],
    )

  return to_object


def _Part_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['function_call']) is not None:
    setv(to_object, ['functionCall'], getv(from_object, ['function_call']))

  if getv(from_object, ['code_execution_result']) is not None:
    setv(
        to_object,
        ['codeExecutionResult'],
        getv(from_object, ['code_execution_result']),
    )

  if getv(from_object, ['executable_code']) is not None:
    setv(to_object, ['executableCode'], getv(from_object, ['executable_code']))

  if getv(from_object, ['file_data']) is not None:
    setv(
        to_object,
        ['fileData'],
        _FileData_to_mldev(getv(from_object, ['file_data']), to_object),
    )

  if getv(from_object, ['function_response']) is not None:
    setv(
        to_object,
        ['functionResponse'],
        getv(from_object, ['function_response']),
    )

  if getv(from_object, ['inline_data']) is not None:
    setv(
        to_object,
        ['inlineData'],
        _Blob_to_mldev(getv(from_object, ['inline_data']), to_object),
    )

  if getv(from_object, ['text']) is not None:
    setv(to_object, ['text'], getv(from_object, ['text']))

  if getv(from_object, ['thought']) is not None:
    setv(to_object, ['thought'], getv(from_object, ['thought']))

  if getv(from_object, ['thought_signature']) is not None:
    setv(
        to_object,
        ['thoughtSignature'],
        getv(from_object, ['thought_signature']),
    )

  if getv(from_object, ['video_metadata']) is not None:
    setv(to_object, ['videoMetadata'], getv(from_object, ['video_metadata']))

  return to_object


def _SafetySetting_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['category']) is not None:
    setv(to_object, ['category'], getv(from_object, ['category']))

  if getv(from_object, ['method']) is not None:
    raise ValueError('method parameter is not supported in Gemini API.')

  if getv(from_object, ['threshold']) is not None:
    setv(to_object, ['threshold'], getv(from_object, ['threshold']))

  return to_object


def _Tool_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['function_declarations']) is not None:
    setv(
        to_object,
        ['functionDeclarations'],
        [item for item in getv(from_object, ['function_declarations'])],
    )

  if getv(from_object, ['retrieval']) is not None:
    raise ValueError('retrieval parameter is not supported in Gemini API.')

  if getv(from_object, ['google_search_retrieval']) is not None:
    setv(
        to_object,
        ['googleSearchRetrieval'],
        getv(from_object, ['google_search_retrieval']),
    )

  if getv(from_object, ['computer_use']) is not None:
    setv(to_object, ['computerUse'], getv(from_object, ['computer_use']))

  if getv(from_object, ['file_search']) is not None:
    setv(to_object, ['fileSearch'], getv(from_object, ['file_search']))

  if getv(from_object, ['code_execution']) is not None:
    setv(to_object, ['codeExecution'], getv(from_object, ['code_execution']))

  if getv(from_object, ['enterprise_web_search']) is not None:
    raise ValueError(
        'enterprise_web_search parameter is not supported in Gemini API.'
    )

  if getv(from_object, ['google_maps']) is not None:
    setv(
        to_object,
        ['googleMaps'],
        _GoogleMaps_to_mldev(getv(from_object, ['google_maps']), to_object),
    )

  if getv(from_object, ['google_search']) is not None:
    setv(
        to_object,
        ['googleSearch'],
        _GoogleSearch_to_mldev(getv(from_object, ['google_search']), to_object),
    )

  if getv(from_object, ['url_context']) is not None:
    setv(to_object, ['urlContext'], getv(from_object, ['url_context']))

  return to_object


class Batches(_api_module.BaseModule):

  def _create(
      self,
      *,
      model: Optional[str] = None,
      src: Union[types.BatchJobSourceUnion, types.BatchJobSourceUnionDict],
      config: Optional[types.CreateBatchJobConfigOrDict] = None,
  ) -> types.BatchJob:
    parameter_model = types._CreateBatchJobParameters(
        model=model,
        src=src,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _CreateBatchJobParameters_to_vertex(
          self._api_client, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'batchPredictionJobs'.format_map(request_url_dict)
      else:
        path = 'batchPredictionJobs'
    else:
      request_dict = _CreateBatchJobParameters_to_mldev(
          self._api_client, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:batchGenerateContent'.format_map(request_url_dict)
      else:
        path = '{model}:batchGenerateContent'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _BatchJob_from_vertex(response_dict)

    if not self._api_client.vertexai:
      response_dict = _BatchJob_from_mldev(response_dict)

    return_value = types.BatchJob._from_response(
        response=response_dict, kwargs=parameter_model.model_dump()
    )

    self._api_client._verify_response(return_value)
    return return_value

  def _create_embeddings(
      self,
      *,
      model: Optional[str] = None,
      src: types.EmbeddingsBatchJobSourceOrDict,
      config: Optional[types.CreateEmbeddingsBatchJobConfigOrDict] = None,
  ) -> types.BatchJob:
    parameter_model = types._CreateEmbeddingsBatchJobParameters(
        model=model,
        src=src,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in the Gemini Developer client.'
      )
    else:
      request_dict = _CreateEmbeddingsBatchJobParameters_to_mldev(
          self._api_client, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:asyncBatchEmbedContent'.format_map(request_url_dict)
      else:
        path = '{model}:asyncBatchEmbedContent'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if not self._api_client.vertexai:
      response_dict = _BatchJob_from_mldev(response_dict)

    return_value = types.BatchJob._from_response(
        response=response_dict, kwargs=parameter_model.model_dump()
    )

    self._api_client._verify_response(return_value)
    return return_value

  def get(
      self, *, name: str, config: Optional[types.GetBatchJobConfigOrDict] = None
  ) -> types.BatchJob:
    """Gets a batch job.

    Args:
      name (str): A fully-qualified BatchJob resource name or ID.
        Example: "projects/.../locations/.../batchPredictionJobs/456" or "456"
          when project and location are initialized in the Vertex AI client. Or
          "batches/abc" using the Gemini Developer AI client.

    Returns:
      A BatchJob object that contains details about the batch job.

    Usage:

    .. code-block:: python

      batch_job = client.batches.get(name='123456789')
      print(f"Batch job: {batch_job.name}, state {batch_job.state}")
    """

    parameter_model = types._GetBatchJobParameters(
        name=name,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _GetBatchJobParameters_to_vertex(
          self._api_client, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'batchPredictionJobs/{name}'.format_map(request_url_dict)
      else:
        path = 'batchPredictionJobs/{name}'
    else:
      request_dict = _GetBatchJobParameters_to_mldev(
          self._api_client, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'batches/{name}'.format_map(request_url_dict)
      else:
        path = 'batches/{name}'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request('get', path, request_dict, http_options)

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _BatchJob_from_vertex(response_dict)

    if not self._api_client.vertexai:
      response_dict = _BatchJob_from_mldev(response_dict)

    return_value = types.BatchJob._from_response(
        response=response_dict, kwargs=parameter_model.model_dump()
    )

    self._api_client._verify_response(return_value)
    return return_value

  def cancel(
      self,
      *,
      name: str,
      config: Optional[types.CancelBatchJobConfigOrDict] = None,
  ) -> None:
    """Cancels a batch job.

    Only available for batch jobs that are running or pending.

    Args:
      name (str): A fully-qualified BatchJob resource name or ID.
        Example: "projects/.../locations/.../batchPredictionJobs/456" or "456"
          when project and location are initialized in the Vertex AI client. Or
          "batches/abc" using the Gemini Developer AI client.

    Usage:

    .. code-block:: python

      client.batches.cancel(name='123456789')
    """

    parameter_model = types._CancelBatchJobParameters(
        name=name,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _CancelBatchJobParameters_to_vertex(
          self._api_client, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'batchPredictionJobs/{name}:cancel'.format_map(request_url_dict)
      else:
        path = 'batchPredictionJobs/{name}:cancel'
    else:
      request_dict = _CancelBatchJobParameters_to_mldev(
          self._api_client, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'batches/{name}:cancel'.format_map(request_url_dict)
      else:
        path = 'batches/{name}:cancel'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request(
        'post', path, request_dict, http_options
    )

  def _list(
      self, *, config: Optional[types.ListBatchJobsConfigOrDict] = None
  ) -> types.ListBatchJobsResponse:
    parameter_model = types._ListBatchJobsParameters(
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _ListBatchJobsParameters_to_vertex(parameter_model)
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'batchPredictionJobs'.format_map(request_url_dict)
      else:
        path = 'batchPredictionJobs'
    else:
      request_dict = _ListBatchJobsParameters_to_mldev(parameter_model)
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'batches'.format_map(request_url_dict)
      else:
        path = 'batches'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request('get', path, request_dict, http_options)

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _ListBatchJobsResponse_from_vertex(response_dict)

    if not self._api_client.vertexai:
      response_dict = _ListBatchJobsResponse_from_mldev(response_dict)

    return_value = types.ListBatchJobsResponse._from_response(
        response=response_dict, kwargs=parameter_model.model_dump()
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  def delete(
      self,
      *,
      name: str,
      config: Optional[types.DeleteBatchJobConfigOrDict] = None,
  ) -> types.DeleteResourceJob:
    """Deletes a batch job.

    Args:
      name (str): A fully-qualified BatchJob resource name or ID.
        Example: "projects/.../locations/.../batchPredictionJobs/456" or "456"
          when project and location are initialized in the client.

    Returns:
      A DeleteResourceJob object that shows the status of the deletion.

    Usage:

    .. code-block:: python

      client.batches.delete(name='123456789')
    """

    parameter_model = types._DeleteBatchJobParameters(
        name=name,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _DeleteBatchJobParameters_to_vertex(
          self._api_client, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'batchPredictionJobs/{name}'.format_map(request_url_dict)
      else:
        path = 'batchPredictionJobs/{name}'
    else:
      request_dict = _DeleteBatchJobParameters_to_mldev(
          self._api_client, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'batches/{name}'.format_map(request_url_dict)
      else:
        path = 'batches/{name}'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request(
        'delete', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _DeleteResourceJob_from_vertex(response_dict)

    if not self._api_client.vertexai:
      response_dict = _DeleteResourceJob_from_mldev(response_dict)

    return_value = types.DeleteResourceJob._from_response(
        response=response_dict, kwargs=parameter_model.model_dump()
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  def create(
      self,
      *,
      model: str,
      src: types.BatchJobSourceUnionDict,
      config: Optional[types.CreateBatchJobConfigOrDict] = None,
  ) -> types.BatchJob:
    """Creates a batch job.

    Args:
      model (str): The model to use for the batch job.
      src: The source of the batch job. Currently Vertex AI supports GCS URI(-s)
        or BigQuery URI. Example: "gs://path/to/input/data" or
        "bq://projectId.bqDatasetId.bqTableId". Gemini Developer API supports
        List of inlined_request, or file name. Example: "files/file_name".
      config (CreateBatchJobConfig): Optional configuration for the batch job.

    Returns:
      A BatchJob object that contains details about the batch job.

    Usage:

    .. code-block:: python

      batch_job = client.batches.create(
          model="gemini-2.0-flash-001",
          src="gs://path/to/input/data",
      )
      print(batch_job.state)
    """
    src = t.t_batch_job_source(self._api_client, src)

    # Convert all dicts to Pydantic objects.
    parameter_model = types._CreateBatchJobParameters(
        model=model,
        src=src,
        config=config,
    )

    if self._api_client.vertexai:
      config = _extra_utils.format_destination(src, parameter_model.config)
      return self._create(model=model, src=src, config=config)
    else:
      return self._create(model=model, src=src, config=config)

  def create_embeddings(
      self,
      *,
      model: str,
      src: types.EmbeddingsBatchJobSourceOrDict,
      config: Optional[types.CreateEmbeddingsBatchJobConfigOrDict] = None,
  ) -> types.BatchJob:
    """**Experimental** Creates an embedding batch job.

    Args:
      model (str): The model to use for the batch job.
      src: Gemini Developer API supports List of inlined_request, or file name.
        Example: "files/file_name".
      config (CreateBatchJobConfig): Optional configuration for the batch job.

    Returns:
      A BatchJob object that contains details about the batch job.

    Usage:

    .. code-block:: python

      batch_job = client.batches.create_embeddings(
          model="text-embedding-004",
          src="files/my_embedding_input",
      )
      print(batch_job.state)
    """
    import warnings

    warnings.warn(
        'batches.create_embeddings() is experimental and may change without'
        ' notice.',
        category=_common.ExperimentalWarning,
        stacklevel=2,  # This is crucial!
    )
    src = t.t_embedding_batch_job_source(self._api_client, src)

    # Convert all dicts to Pydantic objects.
    parameter_model = types._CreateEmbeddingsBatchJobParameters(
        model=model,
        src=src,
        config=config,
    )

    if self._api_client.vertexai:
      raise ValueError('Vertex AI does not support batches.create_embeddings.')
    else:
      return self._create_embeddings(model=model, src=src, config=config)

  def list(
      self, *, config: Optional[types.ListBatchJobsConfigOrDict] = None
  ) -> Pager[types.BatchJob]:
    """Lists batch jobs.

    Args:
      config (ListBatchJobsConfig): Optional configuration for the list request.

    Returns:
      A Pager object that contains one page of batch jobs. When iterating over
      the pager, it automatically fetches the next page if there are more.

    Usage:

    .. code-block:: python

      batch_jobs = client.batches.list(config={"page_size": 10})
      for batch_job in batch_jobs:
        print(f"Batch job: {batch_job.name}, state {batch_job.state}")
    """
    if config is None:
      config = types.ListBatchJobsConfig()
    return Pager(
        'batch_jobs',
        self._list,
        self._list(config=config),
        config,
    )


class AsyncBatches(_api_module.BaseModule):

  async def _create(
      self,
      *,
      model: Optional[str] = None,
      src: Union[types.BatchJobSourceUnion, types.BatchJobSourceUnionDict],
      config: Optional[types.CreateBatchJobConfigOrDict] = None,
  ) -> types.BatchJob:
    parameter_model = types._CreateBatchJobParameters(
        model=model,
        src=src,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _CreateBatchJobParameters_to_vertex(
          self._api_client, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'batchPredictionJobs'.format_map(request_url_dict)
      else:
        path = 'batchPredictionJobs'
    else:
      request_dict = _CreateBatchJobParameters_to_mldev(
          self._api_client, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:batchGenerateContent'.format_map(request_url_dict)
      else:
        path = '{model}:batchGenerateContent'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _BatchJob_from_vertex(response_dict)

    if not self._api_client.vertexai:
      response_dict = _BatchJob_from_mldev(response_dict)

    return_value = types.BatchJob._from_response(
        response=response_dict, kwargs=parameter_model.model_dump()
    )

    self._api_client._verify_response(return_value)
    return return_value

  async def _create_embeddings(
      self,
      *,
      model: Optional[str] = None,
      src: types.EmbeddingsBatchJobSourceOrDict,
      config: Optional[types.CreateEmbeddingsBatchJobConfigOrDict] = None,
  ) -> types.BatchJob:
    parameter_model = types._CreateEmbeddingsBatchJobParameters(
        model=model,
        src=src,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in the Gemini Developer client.'
      )
    else:
      request_dict = _CreateEmbeddingsBatchJobParameters_to_mldev(
          self._api_client, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:asyncBatchEmbedContent'.format_map(request_url_dict)
      else:
        path = '{model}:asyncBatchEmbedContent'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if not self._api_client.vertexai:
      response_dict = _BatchJob_from_mldev(response_dict)

    return_value = types.BatchJob._from_response(
        response=response_dict, kwargs=parameter_model.model_dump()
    )

    self._api_client._verify_response(return_value)
    return return_value

  async def get(
      self, *, name: str, config: Optional[types.GetBatchJobConfigOrDict] = None
  ) -> types.BatchJob:
    """Gets a batch job.

    Args:
      name (str): A fully-qualified BatchJob resource name or ID.
        Example: "projects/.../locations/.../batchPredictionJobs/456" or "456"
          when project and location are initialized in the Vertex AI client. Or
          "batches/abc" using the Gemini Developer AI client.

    Returns:
      A BatchJob object that contains details about the batch job.

    Usage:

    .. code-block:: python

      batch_job = await client.aio.batches.get(name='123456789')
      print(f"Batch job: {batch_job.name}, state {batch_job.state}")
    """

    parameter_model = types._GetBatchJobParameters(
        name=name,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _GetBatchJobParameters_to_vertex(
          self._api_client, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'batchPredictionJobs/{name}'.format_map(request_url_dict)
      else:
        path = 'batchPredictionJobs/{name}'
    else:
      request_dict = _GetBatchJobParameters_to_mldev(
          self._api_client, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'batches/{name}'.format_map(request_url_dict)
      else:
        path = 'batches/{name}'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'get', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _BatchJob_from_vertex(response_dict)

    if not self._api_client.vertexai:
      response_dict = _BatchJob_from_mldev(response_dict)

    return_value = types.BatchJob._from_response(
        response=response_dict, kwargs=parameter_model.model_dump()
    )

    self._api_client._verify_response(return_value)
    return return_value

  async def cancel(
      self,
      *,
      name: str,
      config: Optional[types.CancelBatchJobConfigOrDict] = None,
  ) -> None:
    """Cancels a batch job.

    Only available for batch jobs that are running or pending.

    Args:
      name (str): A fully-qualified BatchJob resource name or ID.
        Example: "projects/.../locations/.../batchPredictionJobs/456" or "456"
          when project and location are initialized in the Vertex AI client. Or
          "batches/abc" using the Gemini Developer AI client.

    Usage:

    .. code-block:: python

      await client.aio.batches.cancel(name='123456789')
    """

    parameter_model = types._CancelBatchJobParameters(
        name=name,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _CancelBatchJobParameters_to_vertex(
          self._api_client, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'batchPredictionJobs/{name}:cancel'.format_map(request_url_dict)
      else:
        path = 'batchPredictionJobs/{name}:cancel'
    else:
      request_dict = _CancelBatchJobParameters_to_mldev(
          self._api_client, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'batches/{name}:cancel'.format_map(request_url_dict)
      else:
        path = 'batches/{name}:cancel'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'post', path, request_dict, http_options
    )

  async def _list(
      self, *, config: Optional[types.ListBatchJobsConfigOrDict] = None
  ) -> types.ListBatchJobsResponse:
    parameter_model = types._ListBatchJobsParameters(
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _ListBatchJobsParameters_to_vertex(parameter_model)
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'batchPredictionJobs'.format_map(request_url_dict)
      else:
        path = 'batchPredictionJobs'
    else:
      request_dict = _ListBatchJobsParameters_to_mldev(parameter_model)
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'batches'.format_map(request_url_dict)
      else:
        path = 'batches'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'get', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _ListBatchJobsResponse_from_vertex(response_dict)

    if not self._api_client.vertexai:
      response_dict = _ListBatchJobsResponse_from_mldev(response_dict)

    return_value = types.ListBatchJobsResponse._from_response(
        response=response_dict, kwargs=parameter_model.model_dump()
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  async def delete(
      self,
      *,
      name: str,
      config: Optional[types.DeleteBatchJobConfigOrDict] = None,
  ) -> types.DeleteResourceJob:
    """Deletes a batch job.

    Args:
      name (str): A fully-qualified BatchJob resource name or ID.
        Example: "projects/.../locations/.../batchPredictionJobs/456" or "456"
          when project and location are initialized in the client.

    Returns:
      A DeleteResourceJob object that shows the status of the deletion.

    Usage:

    .. code-block:: python

      await client.aio.batches.delete(name='123456789')
    """

    parameter_model = types._DeleteBatchJobParameters(
        name=name,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _DeleteBatchJobParameters_to_vertex(
          self._api_client, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'batchPredictionJobs/{name}'.format_map(request_url_dict)
      else:
        path = 'batchPredictionJobs/{name}'
    else:
      request_dict = _DeleteBatchJobParameters_to_mldev(
          self._api_client, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = 'batches/{name}'.format_map(request_url_dict)
      else:
        path = 'batches/{name}'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'delete', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _DeleteResourceJob_from_vertex(response_dict)

    if not self._api_client.vertexai:
      response_dict = _DeleteResourceJob_from_mldev(response_dict)

    return_value = types.DeleteResourceJob._from_response(
        response=response_dict, kwargs=parameter_model.model_dump()
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  async def create(
      self,
      *,
      model: str,
      src: types.BatchJobSourceUnionDict,
      config: Optional[types.CreateBatchJobConfigOrDict] = None,
  ) -> types.BatchJob:
    """Creates a batch job asynchronously.

    Args:
      model (str): The model to use for the batch job.
      src: The source of the batch job. Currently Vertex AI supports GCS URI(-s)
        or BigQuery URI. Example: "gs://path/to/input/data" or
        "bq://projectId.bqDatasetId.bqTableId". Gemini Develop API supports List
        of inlined_request, or file name. Example: "files/file_name".
      config (CreateBatchJobConfig): Optional configuration for the batch job.

    Returns:
      A BatchJob object that contains details about the batch job.

    Usage:

    .. code-block:: python

      batch_job = await client.aio.batches.create(
          model="gemini-2.0-flash-001",
          src="gs://path/to/input/data",
      )
    """
    src = t.t_batch_job_source(self._api_client, src)

    # Convert all dicts to Pydantic objects.
    parameter_model = types._CreateBatchJobParameters(
        model=model,
        src=src,
        config=config,
    )

    if self._api_client.vertexai:
      config = _extra_utils.format_destination(src, parameter_model.config)
      return await self._create(model=model, src=src, config=config)
    else:
      return await self._create(model=model, src=src, config=config)

  async def create_embeddings(
      self,
      *,
      model: str,
      src: types.EmbeddingsBatchJobSourceOrDict,
      config: Optional[types.CreateEmbeddingsBatchJobConfigOrDict] = None,
  ) -> types.BatchJob:
    """**Experimental** Creates an asynchronously embedding batch job.

    Args:
      model (str): The model to use for the batch job.
      src: Gemini Developer API supports inlined_requests, or file name.
        Example: "files/file_name".
      config (CreateBatchJobConfig): Optional configuration for the batch job.

    Returns:
      A BatchJob object that contains details about the batch job.

    Usage:

    .. code-block:: python

      batch_job = await client.aio.batches.create_embeddings(
          model="text-embedding-004",
          src="files/my_embedding_input",
      )
      print(batch_job.state)
    """
    import warnings

    warnings.warn(
        'batches.create_embeddings() is experimental and may change without'
        ' notice.',
        category=_common.ExperimentalWarning,
        stacklevel=2,  # This is crucial!
    )
    src = t.t_embedding_batch_job_source(self._api_client, src)

    # Convert all dicts to Pydantic objects.
    parameter_model = types._CreateEmbeddingsBatchJobParameters(
        model=model,
        src=src,
        config=config,
    )

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    if self._api_client.vertexai:
      raise ValueError('Vertex AI does not support batches.create_embeddings.')
    else:
      return await self._create_embeddings(model=model, src=src, config=config)

  async def list(
      self, *, config: Optional[types.ListBatchJobsConfigOrDict] = None
  ) -> AsyncPager[types.BatchJob]:
    """Lists batch jobs asynchronously.

    Args:
      config (ListBatchJobsConfig): Optional configuration for the list request.

    Returns:
      A Pager object that contains one page of batch jobs. When iterating over
      the pager, it automatically fetches the next page if there are more.

    Usage:

    .. code-block:: python

      batch_jobs = await client.aio.batches.list(config={'page_size': 5})
      print(f"current page: {batch_jobs.page}")
      await batch_jobs_pager.next_page()
      print(f"next page: {batch_jobs_pager.page}")
    """
    if config is None:
      config = types.ListBatchJobsConfig()
    return AsyncPager(
        'batch_jobs',
        self._list,
        await self._list(config=config),
        config,
    )
