Note: for a streaming use case, the assumption is that data never stops and thus it's not a issue for an actual deployment - reading from a file as you do, is not a real stream processing use case: I assume you read from a file for a test, and for this case, your input file should contain a few more record to advance stream-time accordingly. Thus, sending more record is the only way how "stream-time" can be advance. "stream-time" is computed as a function over the record timestamps and thus, if you no records are processed, "stream-time" would advance and suppress() would never emit anything. Properties.put(ProducerConfig.ACKS_CONFIG, "all") Īs you are using suppress() (with untilWindowCloses config) the operator will only emit a final result if "stream-time" advances. Properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ()) Properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ()) Props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class) Īnd in the producer side: properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092) Props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()) Props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Bytes().getClass()) Props.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getAppId()+Constants.APP_ID) The properties used in the Kafka consumed are: props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092) It isn't clear for me why I must send 3 records (if I send 2.5.0
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |