Content-Length: 272327 | pFad | http://github.com/confluentinc/confluent-kafka-python/issues/1195

41 How to deal with msg.error() in batch API · Issue #1195 · confluentinc/confluent-kafka-python · GitHub
Skip to content

How to deal with msg.error() in batch API  #1195

@nikunjy

Description

@nikunjy

Description

How should I deal with msg.error() ?
In the consume API I get a list of messages. Documentation says I need to check the error for each message in the list.

My consumer is enable.auto.commit as False and I need at least once semantics.

So if I get a list of messages in consume and one of them has error() is not None how do I manage my commit because I don't want to skip over certain offset in a partition. So for two partitions imagine I get this

msgs = consumer.consume()
# returns [ msg(partition=0, offset=1), msg(partition=1, offset=1), msg has error,  msg(partition=0, offset=3)]

success = []
for msg in msgs:
 if msg.error() is not None:
   # now what ? 
 else:
  success.append(msg)


for msg in success:
  # Is this wrong because the msg with error could be parition=0 offset=2?
  consumer.commit(msg)

Am I supposed to take care of accounting for this myself somehow?

How to reproduce

Consumer(
            {
                "bootstrap.servers": consumer_config.bootstrap_servers,
                "group.id": consumer_config.group_id,
                "enable.auto.commit": False,
                "auto.offset.reset": "earliest",
                # This exists to recover from lost connection to kafka
                "topic.metadata.refresh.interval.ms": 10000,
                # TODO add max poll interval ms, it should be higher in certain cases where each
                # consume operation is long running
            }
        )

Checklist

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): 1.7.0
  • Apache Kafka broker version:
  • Client configuration: {...}
  • Operating system: distroless
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionA question about how to use or about expected behavior of the libraryworkaroundFor tagging issues that have a workaround documented in the comments or description

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions









      ApplySandwichStrip

      pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


      --- a PPN by Garber Painting Akron. With Image Size Reduction included!

      Fetched URL: http://github.com/confluentinc/confluent-kafka-python/issues/1195

      Alternative Proxies:

      Alternative Proxy

      pFad Proxy

      pFad v3 Proxy

      pFad v4 Proxy